YARN-136. Make ClientToAMTokenSecretManager part of RMContext (Contributed by Vinod Kumar Vavilapalli)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1400278 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1195f844a9
commit
f79ae91414
|
@ -64,6 +64,8 @@ Release 2.0.3-alpha - Unreleased
|
||||||
HADOOP-8911. CRLF characters in source and text files.
|
HADOOP-8911. CRLF characters in source and text files.
|
||||||
(Raja Aluri via suresh)
|
(Raja Aluri via suresh)
|
||||||
|
|
||||||
|
YARN-136. Make ClientToAMTokenSecretManager part of RMContext (Vinod Kumar
|
||||||
|
Vavilapalli via sseth)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
|
|
|
@ -44,7 +44,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
|
||||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -58,19 +57,16 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent> {
|
||||||
private LinkedList<ApplicationId> completedApps = new LinkedList<ApplicationId>();
|
private LinkedList<ApplicationId> completedApps = new LinkedList<ApplicationId>();
|
||||||
|
|
||||||
private final RMContext rmContext;
|
private final RMContext rmContext;
|
||||||
private final ClientToAMTokenSecretManagerInRM clientToAMSecretManager;
|
|
||||||
private final ApplicationMasterService masterService;
|
private final ApplicationMasterService masterService;
|
||||||
private final YarnScheduler scheduler;
|
private final YarnScheduler scheduler;
|
||||||
private final ApplicationACLsManager applicationACLsManager;
|
private final ApplicationACLsManager applicationACLsManager;
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
|
|
||||||
public RMAppManager(RMContext context,
|
public RMAppManager(RMContext context,
|
||||||
ClientToAMTokenSecretManagerInRM clientToAMSecretManager,
|
|
||||||
YarnScheduler scheduler, ApplicationMasterService masterService,
|
YarnScheduler scheduler, ApplicationMasterService masterService,
|
||||||
ApplicationACLsManager applicationACLsManager, Configuration conf) {
|
ApplicationACLsManager applicationACLsManager, Configuration conf) {
|
||||||
this.rmContext = context;
|
this.rmContext = context;
|
||||||
this.scheduler = scheduler;
|
this.scheduler = scheduler;
|
||||||
this.clientToAMSecretManager = clientToAMSecretManager;
|
|
||||||
this.masterService = masterService;
|
this.masterService = masterService;
|
||||||
this.applicationACLsManager = applicationACLsManager;
|
this.applicationACLsManager = applicationACLsManager;
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
|
@ -230,14 +226,18 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent> {
|
||||||
ApplicationId applicationId = submissionContext.getApplicationId();
|
ApplicationId applicationId = submissionContext.getApplicationId();
|
||||||
RMApp application = null;
|
RMApp application = null;
|
||||||
try {
|
try {
|
||||||
// TODO: This needs to move to per-AppAttempt
|
|
||||||
this.clientToAMSecretManager.registerApplication(applicationId);
|
|
||||||
String clientTokenStr = null;
|
String clientTokenStr = null;
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
|
|
||||||
|
// TODO: This needs to move to per-AppAttempt
|
||||||
|
this.rmContext.getClientToAMTokenSecretManager().registerApplication(
|
||||||
|
applicationId);
|
||||||
|
|
||||||
Token<ClientTokenIdentifier> clientToken = new
|
Token<ClientTokenIdentifier> clientToken = new
|
||||||
Token<ClientTokenIdentifier>(
|
Token<ClientTokenIdentifier>(
|
||||||
new ClientTokenIdentifier(applicationId),
|
new ClientTokenIdentifier(applicationId),
|
||||||
this.clientToAMSecretManager);
|
this.rmContext.getClientToAMTokenSecretManager());
|
||||||
clientTokenStr = clientToken.encodeToUrlString();
|
clientTokenStr = clientToken.encodeToUrlString();
|
||||||
LOG.debug("Sending client token as " + clientTokenStr);
|
LOG.debug("Sending client token as " + clientTokenStr);
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessM
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
|
|
||||||
|
@ -61,4 +62,6 @@ public interface RMContext {
|
||||||
ApplicationTokenSecretManager getApplicationTokenSecretManager();
|
ApplicationTokenSecretManager getApplicationTokenSecretManager();
|
||||||
|
|
||||||
RMContainerTokenSecretManager getContainerTokenSecretManager();
|
RMContainerTokenSecretManager getContainerTokenSecretManager();
|
||||||
|
|
||||||
|
ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager();
|
||||||
}
|
}
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessM
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
|
|
||||||
|
@ -55,6 +56,7 @@ public class RMContextImpl implements RMContext {
|
||||||
private final DelegationTokenRenewer tokenRenewer;
|
private final DelegationTokenRenewer tokenRenewer;
|
||||||
private final ApplicationTokenSecretManager appTokenSecretManager;
|
private final ApplicationTokenSecretManager appTokenSecretManager;
|
||||||
private final RMContainerTokenSecretManager containerTokenSecretManager;
|
private final RMContainerTokenSecretManager containerTokenSecretManager;
|
||||||
|
private final ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
|
||||||
|
|
||||||
public RMContextImpl(Store store, Dispatcher rmDispatcher,
|
public RMContextImpl(Store store, Dispatcher rmDispatcher,
|
||||||
ContainerAllocationExpirer containerAllocationExpirer,
|
ContainerAllocationExpirer containerAllocationExpirer,
|
||||||
|
@ -62,7 +64,8 @@ public class RMContextImpl implements RMContext {
|
||||||
AMLivelinessMonitor amFinishingMonitor,
|
AMLivelinessMonitor amFinishingMonitor,
|
||||||
DelegationTokenRenewer tokenRenewer,
|
DelegationTokenRenewer tokenRenewer,
|
||||||
ApplicationTokenSecretManager appTokenSecretManager,
|
ApplicationTokenSecretManager appTokenSecretManager,
|
||||||
RMContainerTokenSecretManager containerTokenSecretManager) {
|
RMContainerTokenSecretManager containerTokenSecretManager,
|
||||||
|
ClientToAMTokenSecretManagerInRM clientTokenSecretManager) {
|
||||||
this.store = store;
|
this.store = store;
|
||||||
this.rmDispatcher = rmDispatcher;
|
this.rmDispatcher = rmDispatcher;
|
||||||
this.containerAllocationExpirer = containerAllocationExpirer;
|
this.containerAllocationExpirer = containerAllocationExpirer;
|
||||||
|
@ -71,6 +74,7 @@ public class RMContextImpl implements RMContext {
|
||||||
this.tokenRenewer = tokenRenewer;
|
this.tokenRenewer = tokenRenewer;
|
||||||
this.appTokenSecretManager = appTokenSecretManager;
|
this.appTokenSecretManager = appTokenSecretManager;
|
||||||
this.containerTokenSecretManager = containerTokenSecretManager;
|
this.containerTokenSecretManager = containerTokenSecretManager;
|
||||||
|
this.clientToAMTokenSecretManager = clientTokenSecretManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -132,4 +136,9 @@ public class RMContextImpl implements RMContext {
|
||||||
public RMContainerTokenSecretManager getContainerTokenSecretManager() {
|
public RMContainerTokenSecretManager getContainerTokenSecretManager() {
|
||||||
return this.containerTokenSecretManager;
|
return this.containerTokenSecretManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() {
|
||||||
|
return this.clientToAMTokenSecretManager;
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -164,7 +164,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
new RMContextImpl(this.store, this.rmDispatcher,
|
new RMContextImpl(this.store, this.rmDispatcher,
|
||||||
this.containerAllocationExpirer, amLivelinessMonitor,
|
this.containerAllocationExpirer, amLivelinessMonitor,
|
||||||
amFinishingMonitor, tokenRenewer, this.appTokenSecretManager,
|
amFinishingMonitor, tokenRenewer, this.appTokenSecretManager,
|
||||||
this.containerTokenSecretManager);
|
this.containerTokenSecretManager, this.clientToAMSecretManager);
|
||||||
|
|
||||||
// Register event handler for NodesListManager
|
// Register event handler for NodesListManager
|
||||||
this.nodesListManager = new NodesListManager(this.rmContext);
|
this.nodesListManager = new NodesListManager(this.rmContext);
|
||||||
|
@ -273,8 +273,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
} }
|
} }
|
||||||
|
|
||||||
protected ApplicationMasterLauncher createAMLauncher() {
|
protected ApplicationMasterLauncher createAMLauncher() {
|
||||||
return new ApplicationMasterLauncher(this.clientToAMSecretManager,
|
return new ApplicationMasterLauncher(this.rmContext);
|
||||||
this.rmContext);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private NMLivelinessMonitor createNMLivelinessMonitor() {
|
private NMLivelinessMonitor createNMLivelinessMonitor() {
|
||||||
|
@ -291,9 +290,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected RMAppManager createRMAppManager() {
|
protected RMAppManager createRMAppManager() {
|
||||||
return new RMAppManager(this.rmContext, this.clientToAMSecretManager,
|
return new RMAppManager(this.rmContext, this.scheduler, this.masterService,
|
||||||
this.scheduler, this.masterService, this.applicationACLsManager,
|
this.applicationACLsManager, this.conf);
|
||||||
this.conf);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
|
|
|
@ -60,7 +60,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
|
||||||
import org.apache.hadoop.yarn.util.ProtoUtils;
|
import org.apache.hadoop.yarn.util.ProtoUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -76,7 +75,6 @@ public class AMLauncher implements Runnable {
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
private final RecordFactory recordFactory =
|
private final RecordFactory recordFactory =
|
||||||
RecordFactoryProvider.getRecordFactory(null);
|
RecordFactoryProvider.getRecordFactory(null);
|
||||||
private final ClientToAMTokenSecretManagerInRM clientToAMSecretManager;
|
|
||||||
private final AMLauncherEventType eventType;
|
private final AMLauncherEventType eventType;
|
||||||
private final RMContext rmContext;
|
private final RMContext rmContext;
|
||||||
|
|
||||||
|
@ -84,11 +82,9 @@ public class AMLauncher implements Runnable {
|
||||||
private final EventHandler handler;
|
private final EventHandler handler;
|
||||||
|
|
||||||
public AMLauncher(RMContext rmContext, RMAppAttempt application,
|
public AMLauncher(RMContext rmContext, RMAppAttempt application,
|
||||||
AMLauncherEventType eventType,
|
AMLauncherEventType eventType, Configuration conf) {
|
||||||
ClientToAMTokenSecretManagerInRM clientToAMSecretManager, Configuration conf) {
|
|
||||||
this.application = application;
|
this.application = application;
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.clientToAMSecretManager = clientToAMSecretManager;
|
|
||||||
this.eventType = eventType;
|
this.eventType = eventType;
|
||||||
this.rmContext = rmContext;
|
this.rmContext = rmContext;
|
||||||
this.handler = rmContext.getDispatcher().getEventHandler();
|
this.handler = rmContext.getDispatcher().getEventHandler();
|
||||||
|
@ -240,7 +236,8 @@ public class AMLauncher implements Runnable {
|
||||||
ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
|
ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
|
||||||
|
|
||||||
SecretKey clientSecretKey =
|
SecretKey clientSecretKey =
|
||||||
this.clientToAMSecretManager.getMasterKey(applicationId);
|
this.rmContext.getClientToAMTokenSecretManager().getMasterKey(
|
||||||
|
applicationId);
|
||||||
String encoded =
|
String encoded =
|
||||||
Base64.encodeBase64URLSafeString(clientSecretKey.getEncoded());
|
Base64.encodeBase64URLSafeString(clientSecretKey.getEncoded());
|
||||||
environment.put(
|
environment.put(
|
||||||
|
|
|
@ -25,10 +25,8 @@ import java.util.concurrent.TimeUnit;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.security.client.BaseClientToAMTokenSecretManager;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
|
||||||
import org.apache.hadoop.yarn.service.AbstractService;
|
import org.apache.hadoop.yarn.service.AbstractService;
|
||||||
|
|
||||||
|
|
||||||
|
@ -42,17 +40,14 @@ public class ApplicationMasterLauncher extends AbstractService implements
|
||||||
private final BlockingQueue<Runnable> masterEvents
|
private final BlockingQueue<Runnable> masterEvents
|
||||||
= new LinkedBlockingQueue<Runnable>();
|
= new LinkedBlockingQueue<Runnable>();
|
||||||
|
|
||||||
private ClientToAMTokenSecretManagerInRM clientToAMSecretManager;
|
|
||||||
protected final RMContext context;
|
protected final RMContext context;
|
||||||
|
|
||||||
public ApplicationMasterLauncher(
|
public ApplicationMasterLauncher(RMContext context) {
|
||||||
ClientToAMTokenSecretManagerInRM clientToAMSecretManager, RMContext context) {
|
|
||||||
super(ApplicationMasterLauncher.class.getName());
|
super(ApplicationMasterLauncher.class.getName());
|
||||||
this.context = context;
|
this.context = context;
|
||||||
this.launcherPool = new ThreadPoolExecutor(10, 10, 1,
|
this.launcherPool = new ThreadPoolExecutor(10, 10, 1,
|
||||||
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
|
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
|
||||||
this.launcherHandlingThread = new LauncherThread();
|
this.launcherHandlingThread = new LauncherThread();
|
||||||
this.clientToAMSecretManager = clientToAMSecretManager;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start() {
|
public void start() {
|
||||||
|
@ -63,8 +58,7 @@ public class ApplicationMasterLauncher extends AbstractService implements
|
||||||
protected Runnable createRunnableLauncher(RMAppAttempt application,
|
protected Runnable createRunnableLauncher(RMAppAttempt application,
|
||||||
AMLauncherEventType event) {
|
AMLauncherEventType event) {
|
||||||
Runnable launcher =
|
Runnable launcher =
|
||||||
new AMLauncher(context, application, event, clientToAMSecretManager,
|
new AMLauncher(context, application, event, getConfig());
|
||||||
getConfig());
|
|
||||||
return launcher;
|
return launcher;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -34,8 +34,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
|
||||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
|
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
|
||||||
|
@ -240,8 +240,7 @@ public class MockRM extends ResourceManager {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ApplicationMasterLauncher createAMLauncher() {
|
protected ApplicationMasterLauncher createAMLauncher() {
|
||||||
return new ApplicationMasterLauncher(this.clientToAMSecretManager,
|
return new ApplicationMasterLauncher(getRMContext()) {
|
||||||
getRMContext()) {
|
|
||||||
@Override
|
@Override
|
||||||
public void start() {
|
public void start() {
|
||||||
// override to not start rpc handler
|
// override to not start rpc handler
|
||||||
|
|
|
@ -42,13 +42,11 @@ public class MockRMWithCustomAMLauncher extends MockRM {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ApplicationMasterLauncher createAMLauncher() {
|
protected ApplicationMasterLauncher createAMLauncher() {
|
||||||
return new ApplicationMasterLauncher(super.clientToAMSecretManager,
|
return new ApplicationMasterLauncher(getRMContext()) {
|
||||||
getRMContext()) {
|
|
||||||
@Override
|
@Override
|
||||||
protected Runnable createRunnableLauncher(RMAppAttempt application,
|
protected Runnable createRunnableLauncher(RMAppAttempt application,
|
||||||
AMLauncherEventType event) {
|
AMLauncherEventType event) {
|
||||||
return new AMLauncher(context, application, event,
|
return new AMLauncher(context, application, event, getConfig()) {
|
||||||
clientToAMSecretManager, getConfig()) {
|
|
||||||
@Override
|
@Override
|
||||||
protected ContainerManager getContainerMgrProxy(
|
protected ContainerManager getContainerMgrProxy(
|
||||||
ContainerId containerId) {
|
ContainerId containerId) {
|
||||||
|
|
|
@ -95,7 +95,7 @@ public class TestAppManager{
|
||||||
rmDispatcher);
|
rmDispatcher);
|
||||||
return new RMContextImpl(new MemStore(), rmDispatcher,
|
return new RMContextImpl(new MemStore(), rmDispatcher,
|
||||||
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
|
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
|
||||||
null, null, null) {
|
null, null, null, null) {
|
||||||
@Override
|
@Override
|
||||||
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
|
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
|
||||||
return map;
|
return map;
|
||||||
|
@ -135,7 +135,7 @@ public class TestAppManager{
|
||||||
public class TestRMAppManager extends RMAppManager {
|
public class TestRMAppManager extends RMAppManager {
|
||||||
|
|
||||||
public TestRMAppManager(RMContext context, Configuration conf) {
|
public TestRMAppManager(RMContext context, Configuration conf) {
|
||||||
super(context, null, null, null, new ApplicationACLsManager(conf), conf);
|
super(context, null, null, new ApplicationACLsManager(conf), conf);
|
||||||
setCompletedAppsMax(YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS);
|
setCompletedAppsMax(YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -143,8 +143,7 @@ public class TestAppManager{
|
||||||
ClientToAMTokenSecretManagerInRM clientToAMSecretManager,
|
ClientToAMTokenSecretManagerInRM clientToAMSecretManager,
|
||||||
YarnScheduler scheduler, ApplicationMasterService masterService,
|
YarnScheduler scheduler, ApplicationMasterService masterService,
|
||||||
ApplicationACLsManager applicationACLsManager, Configuration conf) {
|
ApplicationACLsManager applicationACLsManager, Configuration conf) {
|
||||||
super(context, clientToAMSecretManager, scheduler, masterService,
|
super(context, scheduler, masterService, applicationACLsManager, conf);
|
||||||
applicationACLsManager, conf);
|
|
||||||
setCompletedAppsMax(YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS);
|
setCompletedAppsMax(YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
|
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
||||||
|
@ -81,7 +80,7 @@ public class TestRMNodeTransitions {
|
||||||
|
|
||||||
rmContext =
|
rmContext =
|
||||||
new RMContextImpl(new MemStore(), rmDispatcher, null, null, null,
|
new RMContextImpl(new MemStore(), rmDispatcher, null, null, null,
|
||||||
mock(DelegationTokenRenewer.class), null, null);
|
mock(DelegationTokenRenewer.class), null, null, null);
|
||||||
scheduler = mock(YarnScheduler.class);
|
scheduler = mock(YarnScheduler.class);
|
||||||
doAnswer(
|
doAnswer(
|
||||||
new Answer<Void>() {
|
new Answer<Void>() {
|
||||||
|
|
|
@ -71,7 +71,7 @@ public class TestNMExpiry {
|
||||||
// Dispatcher that processes events inline
|
// Dispatcher that processes events inline
|
||||||
Dispatcher dispatcher = new InlineDispatcher();
|
Dispatcher dispatcher = new InlineDispatcher();
|
||||||
RMContext context = new RMContextImpl(new MemStore(), dispatcher, null,
|
RMContext context = new RMContextImpl(new MemStore(), dispatcher, null,
|
||||||
null, null, null, null, null);
|
null, null, null, null, null, null);
|
||||||
dispatcher.register(SchedulerEventType.class,
|
dispatcher.register(SchedulerEventType.class,
|
||||||
new InlineDispatcher.EmptyEventHandler());
|
new InlineDispatcher.EmptyEventHandler());
|
||||||
dispatcher.register(RMNodeEventType.class,
|
dispatcher.register(RMNodeEventType.class,
|
||||||
|
|
|
@ -66,8 +66,8 @@ public class TestRMNMRPCResponseId {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
RMContext context =
|
RMContext context =
|
||||||
new RMContextImpl(new MemStore(), dispatcher, null, null, null,
|
new RMContextImpl(new MemStore(), dispatcher, null, null, null, null,
|
||||||
null, null, null);
|
null, null, null);
|
||||||
dispatcher.register(RMNodeEventType.class,
|
dispatcher.register(RMNodeEventType.class,
|
||||||
new ResourceManager.NodeEventDispatcher(context));
|
new ResourceManager.NodeEventDispatcher(context));
|
||||||
NodesListManager nodesListManager = new NodesListManager(context);
|
NodesListManager nodesListManager = new NodesListManager(context);
|
||||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -142,7 +143,8 @@ public class TestRMAppTransitions {
|
||||||
new RMContextImpl(new MemStore(), rmDispatcher,
|
new RMContextImpl(new MemStore(), rmDispatcher,
|
||||||
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
|
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
|
||||||
null, new ApplicationTokenSecretManager(conf),
|
null, new ApplicationTokenSecretManager(conf),
|
||||||
new RMContainerTokenSecretManager(conf));
|
new RMContainerTokenSecretManager(conf),
|
||||||
|
new ClientToAMTokenSecretManagerInRM());
|
||||||
|
|
||||||
rmDispatcher.register(RMAppAttemptEventType.class,
|
rmDispatcher.register(RMAppAttemptEventType.class,
|
||||||
new TestApplicationAttemptEventDispatcher(this.rmContext));
|
new TestApplicationAttemptEventDispatcher(this.rmContext));
|
||||||
|
|
|
@ -72,6 +72,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSch
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -160,7 +161,8 @@ public class TestRMAppAttemptTransitions {
|
||||||
new RMContextImpl(new MemStore(), rmDispatcher,
|
new RMContextImpl(new MemStore(), rmDispatcher,
|
||||||
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
|
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
|
||||||
null, new ApplicationTokenSecretManager(conf),
|
null, new ApplicationTokenSecretManager(conf),
|
||||||
new RMContainerTokenSecretManager(conf));
|
new RMContainerTokenSecretManager(conf),
|
||||||
|
new ClientToAMTokenSecretManagerInRM());
|
||||||
|
|
||||||
scheduler = mock(YarnScheduler.class);
|
scheduler = mock(YarnScheduler.class);
|
||||||
masterService = mock(ApplicationMasterService.class);
|
masterService = mock(ApplicationMasterService.class);
|
||||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -250,7 +251,8 @@ public class TestCapacityScheduler {
|
||||||
setupQueueConfiguration(conf);
|
setupQueueConfiguration(conf);
|
||||||
cs.setConf(new YarnConfiguration());
|
cs.setConf(new YarnConfiguration());
|
||||||
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, null,
|
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, null,
|
||||||
null, new RMContainerTokenSecretManager(conf)));
|
null, new RMContainerTokenSecretManager(conf),
|
||||||
|
new ClientToAMTokenSecretManagerInRM()));
|
||||||
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
|
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
|
||||||
|
|
||||||
conf.setCapacity(A, 80f);
|
conf.setCapacity(A, 80f);
|
||||||
|
@ -347,7 +349,8 @@ public class TestCapacityScheduler {
|
||||||
conf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100.0f);
|
conf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100.0f);
|
||||||
|
|
||||||
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, null,
|
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, null,
|
||||||
null, new RMContainerTokenSecretManager(conf)));
|
null, new RMContainerTokenSecretManager(conf),
|
||||||
|
new ClientToAMTokenSecretManagerInRM()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -357,8 +360,9 @@ public class TestCapacityScheduler {
|
||||||
setupQueueConfiguration(csConf);
|
setupQueueConfiguration(csConf);
|
||||||
CapacityScheduler cs = new CapacityScheduler();
|
CapacityScheduler cs = new CapacityScheduler();
|
||||||
cs.setConf(new YarnConfiguration());
|
cs.setConf(new YarnConfiguration());
|
||||||
cs.reinitialize(csConf, new RMContextImpl(null, null, null, null, null, null,
|
cs.reinitialize(csConf, new RMContextImpl(null, null, null, null, null,
|
||||||
null, new RMContainerTokenSecretManager(csConf)));
|
null, null, new RMContainerTokenSecretManager(csConf),
|
||||||
|
new ClientToAMTokenSecretManagerInRM()));
|
||||||
|
|
||||||
RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1);
|
RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1);
|
||||||
RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2);
|
RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2);
|
||||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -43,7 +44,8 @@ public class TestQueueParsing {
|
||||||
CapacityScheduler capacityScheduler = new CapacityScheduler();
|
CapacityScheduler capacityScheduler = new CapacityScheduler();
|
||||||
capacityScheduler.setConf(conf);
|
capacityScheduler.setConf(conf);
|
||||||
capacityScheduler.reinitialize(conf, new RMContextImpl(null, null, null,
|
capacityScheduler.reinitialize(conf, new RMContextImpl(null, null, null,
|
||||||
null, null, null, null, new RMContainerTokenSecretManager(conf)));
|
null, null, null, null, new RMContainerTokenSecretManager(conf),
|
||||||
|
new ClientToAMTokenSecretManagerInRM()));
|
||||||
|
|
||||||
CSQueue a = capacityScheduler.getQueue("a");
|
CSQueue a = capacityScheduler.getQueue("a");
|
||||||
Assert.assertEquals(0.10, a.getAbsoluteCapacity(), DELTA);
|
Assert.assertEquals(0.10, a.getAbsoluteCapacity(), DELTA);
|
||||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
|
|
||||||
public class TestUtils {
|
public class TestUtils {
|
||||||
|
@ -84,7 +85,8 @@ public class TestUtils {
|
||||||
RMContext rmContext =
|
RMContext rmContext =
|
||||||
new RMContextImpl(null, nullDispatcher, cae, null, null, null,
|
new RMContextImpl(null, nullDispatcher, cae, null, null, null,
|
||||||
new ApplicationTokenSecretManager(conf),
|
new ApplicationTokenSecretManager(conf),
|
||||||
new RMContainerTokenSecretManager(conf));
|
new RMContainerTokenSecretManager(conf),
|
||||||
|
new ClientToAMTokenSecretManagerInRM());
|
||||||
|
|
||||||
return rmContext;
|
return rmContext;
|
||||||
}
|
}
|
||||||
|
|
|
@ -92,7 +92,7 @@ public class TestFifoScheduler {
|
||||||
public void testAppAttemptMetrics() throws Exception {
|
public void testAppAttemptMetrics() throws Exception {
|
||||||
AsyncDispatcher dispatcher = new InlineDispatcher();
|
AsyncDispatcher dispatcher = new InlineDispatcher();
|
||||||
RMContext rmContext = new RMContextImpl(null, dispatcher, null,
|
RMContext rmContext = new RMContextImpl(null, dispatcher, null,
|
||||||
null, null, null, null, null);
|
null, null, null, null, null, null);
|
||||||
|
|
||||||
FifoScheduler schedular = new FifoScheduler();
|
FifoScheduler schedular = new FifoScheduler();
|
||||||
schedular.reinitialize(new Configuration(), rmContext);
|
schedular.reinitialize(new Configuration(), rmContext);
|
||||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
import org.apache.hadoop.yarn.util.StringHelper;
|
import org.apache.hadoop.yarn.util.StringHelper;
|
||||||
|
@ -160,7 +161,7 @@ public class TestRMWebApp {
|
||||||
deactivatedNodesMap.put(node.getHostName(), node);
|
deactivatedNodesMap.put(node.getHostName(), node);
|
||||||
}
|
}
|
||||||
return new RMContextImpl(new MemStore(), null, null, null, null,
|
return new RMContextImpl(new MemStore(), null, null, null, null,
|
||||||
null, null, null) {
|
null, null, null, null) {
|
||||||
@Override
|
@Override
|
||||||
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
|
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
|
||||||
return applicationsMaps;
|
return applicationsMaps;
|
||||||
|
@ -201,7 +202,8 @@ public class TestRMWebApp {
|
||||||
CapacityScheduler cs = new CapacityScheduler();
|
CapacityScheduler cs = new CapacityScheduler();
|
||||||
cs.setConf(new YarnConfiguration());
|
cs.setConf(new YarnConfiguration());
|
||||||
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, null,
|
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, null,
|
||||||
null, new RMContainerTokenSecretManager(conf)));
|
null, new RMContainerTokenSecretManager(conf),
|
||||||
|
new ClientToAMTokenSecretManagerInRM()));
|
||||||
return cs;
|
return cs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue