Merge MAPREDUCE-3942 from trunk. Randomize master key generation for ApplicationTokenSecretManager and roll it every so often. (Contributed by Vinod Kumar Vavilapalli)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1327221 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0e138756d4
commit
ab8ea1db58
|
@ -157,6 +157,10 @@ Release 0.23.3 - UNRELEASED
|
|||
MAPREDUCE-4059. The history server should have a separate pluggable
|
||||
storage/query interface. (Robert Evans via tgraves)
|
||||
|
||||
MAPREDUCE-3942. Randomize master key generation for
|
||||
ApplicationTokenSecretManager and roll it every so often. (Vinod Kumar
|
||||
Vavilapalli via sseth)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -56,7 +56,12 @@ public class AMRMProtocolPBClientImpl implements AMRMProtocol {
|
|||
AMRMProtocolPB.class, clientVersion, addr, conf);
|
||||
}
|
||||
|
||||
|
||||
public void close() {
|
||||
if (this.proxy != null) {
|
||||
RPC.stopProxy(this.proxy);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public AllocateResponse allocate(AllocateRequest request)
|
||||
throws YarnRemoteException {
|
||||
|
|
|
@ -246,6 +246,12 @@ public class YarnConfiguration extends Configuration {
|
|||
public static final String DEFAULT_RM_METRICS_RUNTIME_BUCKETS =
|
||||
"60,300,1440";
|
||||
|
||||
public static final String RM_APP_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS = RM_PREFIX
|
||||
+ "application-tokens.master-key-rolling-interval-secs";
|
||||
|
||||
public static final long DEFAULT_RM_APP_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS =
|
||||
24 * 60 * 60;
|
||||
|
||||
////////////////////////////////
|
||||
// Node Manager Configs
|
||||
////////////////////////////////
|
||||
|
|
|
@ -23,34 +23,55 @@ import java.io.DataOutput;
|
|||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
|
||||
/**
|
||||
* ApplicationTokenIdentifier is the TokenIdentifier to be used by
|
||||
* ApplicationMasters to authenticate to the ResourceManager.
|
||||
*/
|
||||
public class ApplicationTokenIdentifier extends TokenIdentifier {
|
||||
|
||||
public static final Text KIND_NAME = new Text("YARN_APPLICATION_TOKEN");
|
||||
|
||||
private String applicationAttemptId;
|
||||
private ApplicationAttemptId applicationAttemptId;
|
||||
|
||||
public ApplicationTokenIdentifier() {
|
||||
}
|
||||
|
||||
public ApplicationTokenIdentifier(ApplicationAttemptId appAttemptId) {
|
||||
this();
|
||||
this.applicationAttemptId = appAttemptId.toString();
|
||||
this.applicationAttemptId = appAttemptId;
|
||||
}
|
||||
|
||||
@Private
|
||||
public ApplicationAttemptId getApplicationAttemptId() {
|
||||
return this.applicationAttemptId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
Text.writeString(out, this.applicationAttemptId);
|
||||
ApplicationId appId = this.applicationAttemptId.getApplicationId();
|
||||
out.writeLong(appId.getClusterTimestamp());
|
||||
out.writeInt(appId.getId());
|
||||
out.writeInt(this.applicationAttemptId.getAttemptId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
this.applicationAttemptId = Text.readString(in);
|
||||
long clusterTimeStamp = in.readLong();
|
||||
int appId = in.readInt();
|
||||
int attemptId = in.readInt();
|
||||
ApplicationId applicationId =
|
||||
BuilderUtils.newApplicationId(clusterTimeStamp, appId);
|
||||
this.applicationAttemptId =
|
||||
BuilderUtils.newApplicationAttemptId(applicationId, attemptId);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -68,6 +89,7 @@ public class ApplicationTokenIdentifier extends TokenIdentifier {
|
|||
.toString());
|
||||
}
|
||||
|
||||
// TODO: Needed?
|
||||
@InterfaceAudience.Private
|
||||
public static class Renewer extends Token.TrivialRenewer {
|
||||
@Override
|
||||
|
|
|
@ -1,78 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.security;
|
||||
|
||||
import javax.crypto.SecretKey;
|
||||
|
||||
import org.apache.hadoop.security.token.SecretManager;
|
||||
|
||||
public class ApplicationTokenSecretManager extends
|
||||
SecretManager<ApplicationTokenIdentifier> {
|
||||
|
||||
// TODO: mark as final
|
||||
private SecretKey masterKey; // For now only one masterKey, for ever.
|
||||
|
||||
// TODO: add expiry for masterKey
|
||||
// TODO: add logic to handle with multiple masterKeys, only one being used for
|
||||
// creating new tokens at any time.
|
||||
// TODO: Make he masterKey more secure, non-transferrable etc.
|
||||
|
||||
/**
|
||||
* Default constructor
|
||||
*/
|
||||
public ApplicationTokenSecretManager() {
|
||||
this.masterKey = generateSecret();
|
||||
}
|
||||
|
||||
// TODO: this should go away.
|
||||
public void setMasterKey(SecretKey mk) {
|
||||
this.masterKey = mk;
|
||||
}
|
||||
|
||||
// TODO: this should go away.
|
||||
public SecretKey getMasterKey() {
|
||||
return masterKey;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert the byte[] to a secret key
|
||||
* @param key the byte[] to create the secret key from
|
||||
* @return the secret key
|
||||
*/
|
||||
public static SecretKey createSecretKey(byte[] key) {
|
||||
return SecretManager.createSecretKey(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] createPassword(ApplicationTokenIdentifier identifier) {
|
||||
return createPassword(identifier.getBytes(), masterKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] retrievePassword(ApplicationTokenIdentifier identifier)
|
||||
throws SecretManager.InvalidToken {
|
||||
return createPassword(identifier.getBytes(), masterKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApplicationTokenIdentifier createIdentifier() {
|
||||
return new ApplicationTokenIdentifier();
|
||||
}
|
||||
|
||||
}
|
|
@ -215,6 +215,14 @@
|
|||
<value>30000</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>Interval for the roll over for the master key used to generate
|
||||
application tokens
|
||||
</description>
|
||||
<name>yarn.resourcemanager.application-tokens.master-key-rolling-interval-secs</name>
|
||||
<value>86400</value>
|
||||
</property>
|
||||
|
||||
<!-- Node Manager Configs -->
|
||||
<property>
|
||||
<description>address of node manager IPC.</description>
|
||||
|
|
|
@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sec
|
|||
|
||||
import java.lang.annotation.Annotation;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.KerberosInfo;
|
||||
import org.apache.hadoop.security.SecurityInfo;
|
||||
|
@ -30,6 +32,8 @@ import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocolPB;
|
|||
|
||||
public class LocalizerSecurityInfo extends SecurityInfo {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(LocalizerSecurityInfo.class);
|
||||
|
||||
@Override
|
||||
public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) {
|
||||
return null;
|
||||
|
@ -51,7 +55,7 @@ public class LocalizerSecurityInfo extends SecurityInfo {
|
|||
@Override
|
||||
public Class<? extends TokenSelector<? extends TokenIdentifier>>
|
||||
value() {
|
||||
System.err.print("=========== Using localizerTokenSecurityInfo");
|
||||
LOG.debug("Using localizerTokenSecurityInfo");
|
||||
return LocalizerTokenSelector.class;
|
||||
}
|
||||
};
|
||||
|
|
|
@ -55,7 +55,6 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
|
|||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeState;
|
||||
|
@ -72,14 +71,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicy
|
|||
import org.apache.hadoop.yarn.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Private
|
||||
public class ApplicationMasterService extends AbstractService implements
|
||||
AMRMProtocol {
|
||||
private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class);
|
||||
private final AMLivelinessMonitor amLivelinessMonitor;
|
||||
private YarnScheduler rScheduler;
|
||||
private ApplicationTokenSecretManager appTokenManager;
|
||||
private InetSocketAddress masterServiceAddress;
|
||||
private InetSocketAddress bindAddress;
|
||||
private Server server;
|
||||
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
||||
private final ConcurrentMap<ApplicationAttemptId, AMResponse> responseMap =
|
||||
|
@ -87,35 +86,31 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
private final AMResponse reboot = recordFactory.newRecordInstance(AMResponse.class);
|
||||
private final RMContext rmContext;
|
||||
|
||||
public ApplicationMasterService(RMContext rmContext,
|
||||
ApplicationTokenSecretManager appTokenManager, YarnScheduler scheduler) {
|
||||
public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) {
|
||||
super(ApplicationMasterService.class.getName());
|
||||
this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
|
||||
this.appTokenManager = appTokenManager;
|
||||
this.rScheduler = scheduler;
|
||||
this.reboot.setReboot(true);
|
||||
// this.reboot.containers = new ArrayList<Container>();
|
||||
this.rmContext = rmContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Configuration conf) {
|
||||
String bindAddress =
|
||||
conf.get(YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS);
|
||||
masterServiceAddress = NetUtils.createSocketAddr(bindAddress,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT,
|
||||
YarnConfiguration.RM_SCHEDULER_ADDRESS);
|
||||
super.init(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
Configuration conf = getConfig();
|
||||
YarnRPC rpc = YarnRPC.create(conf);
|
||||
|
||||
String bindAddressStr =
|
||||
conf.get(YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS);
|
||||
InetSocketAddress masterServiceAddress =
|
||||
NetUtils.createSocketAddr(bindAddressStr,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT,
|
||||
YarnConfiguration.RM_SCHEDULER_ADDRESS);
|
||||
|
||||
this.server =
|
||||
rpc.getServer(AMRMProtocol.class, this, masterServiceAddress,
|
||||
conf, this.appTokenManager,
|
||||
conf, this.rmContext.getApplicationTokenSecretManager(),
|
||||
conf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
|
||||
|
||||
|
@ -127,9 +122,19 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
}
|
||||
|
||||
this.server.start();
|
||||
|
||||
this.bindAddress =
|
||||
NetUtils.createSocketAddr(masterServiceAddress.getHostName(),
|
||||
this.server.getPort());
|
||||
|
||||
super.start();
|
||||
}
|
||||
|
||||
@Private
|
||||
public InetSocketAddress getBindAddress() {
|
||||
return this.bindAddress;
|
||||
}
|
||||
|
||||
private void authorizeRequest(ApplicationAttemptId appAttemptID)
|
||||
throws YarnRemoteException {
|
||||
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
||||
|
||||
/**
|
||||
|
@ -53,4 +54,6 @@ public interface RMContext {
|
|||
ContainerAllocationExpirer getContainerAllocationExpirer();
|
||||
|
||||
DelegationTokenRenewer getDelegationTokenRenewer();
|
||||
|
||||
ApplicationTokenSecretManager getApplicationTokenSecretManager();
|
||||
}
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
||||
|
||||
public class RMContextImpl implements RMContext {
|
||||
|
@ -50,16 +51,19 @@ public class RMContextImpl implements RMContext {
|
|||
private AMLivelinessMonitor amLivelinessMonitor;
|
||||
private ContainerAllocationExpirer containerAllocationExpirer;
|
||||
private final DelegationTokenRenewer tokenRenewer;
|
||||
private final ApplicationTokenSecretManager appTokenSecretManager;
|
||||
|
||||
public RMContextImpl(Store store, Dispatcher rmDispatcher,
|
||||
ContainerAllocationExpirer containerAllocationExpirer,
|
||||
AMLivelinessMonitor amLivelinessMonitor,
|
||||
DelegationTokenRenewer tokenRenewer) {
|
||||
DelegationTokenRenewer tokenRenewer,
|
||||
ApplicationTokenSecretManager appTokenSecretManager) {
|
||||
this.store = store;
|
||||
this.rmDispatcher = rmDispatcher;
|
||||
this.containerAllocationExpirer = containerAllocationExpirer;
|
||||
this.amLivelinessMonitor = amLivelinessMonitor;
|
||||
this.tokenRenewer = tokenRenewer;
|
||||
this.appTokenSecretManager = appTokenSecretManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -106,4 +110,9 @@ public class RMContextImpl implements RMContext {
|
|||
public DelegationTokenRenewer getDelegationTokenRenewer() {
|
||||
return tokenRenewer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApplicationTokenSecretManager getApplicationTokenSecretManager() {
|
||||
return this.appTokenSecretManager;
|
||||
}
|
||||
}
|
|
@ -27,7 +27,6 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.metrics2.source.JvmMetrics;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
|
@ -41,7 +40,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
|
||||
import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
|
||||
|
@ -65,6 +63,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
|
@ -82,8 +81,10 @@ import org.apache.hadoop.yarn.webapp.WebApps.Builder;
|
|||
|
||||
/**
|
||||
* The ResourceManager is the main class that is a set of components.
|
||||
* "I am the ResourceManager. All your resources are belong to us..."
|
||||
*
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public class ResourceManager extends CompositeService implements Recoverable {
|
||||
private static final Log LOG = LogFactory.getLog(ResourceManager.class);
|
||||
public static final long clusterTimeStamp = System.currentTimeMillis();
|
||||
|
@ -94,8 +95,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
protected ContainerTokenSecretManager containerTokenSecretManager =
|
||||
new ContainerTokenSecretManager();
|
||||
|
||||
protected ApplicationTokenSecretManager appTokenSecretManager =
|
||||
new ApplicationTokenSecretManager();
|
||||
protected ApplicationTokenSecretManager appTokenSecretManager;
|
||||
|
||||
private Dispatcher rmDispatcher;
|
||||
|
||||
|
@ -137,6 +137,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
this.rmDispatcher = createDispatcher();
|
||||
addIfService(this.rmDispatcher);
|
||||
|
||||
this.appTokenSecretManager = createApplicationTokenSecretManager(conf);
|
||||
|
||||
this.containerAllocationExpirer = new ContainerAllocationExpirer(
|
||||
this.rmDispatcher);
|
||||
addService(this.containerAllocationExpirer);
|
||||
|
@ -147,8 +149,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
DelegationTokenRenewer tokenRenewer = createDelegationTokenRenewer();
|
||||
addService(tokenRenewer);
|
||||
|
||||
this.rmContext = new RMContextImpl(this.store, this.rmDispatcher,
|
||||
this.containerAllocationExpirer, amLivelinessMonitor, tokenRenewer);
|
||||
this.rmContext =
|
||||
new RMContextImpl(this.store, this.rmDispatcher,
|
||||
this.containerAllocationExpirer, amLivelinessMonitor, tokenRenewer,
|
||||
this.appTokenSecretManager);
|
||||
|
||||
// Register event handler for NodesListManager
|
||||
this.nodesListManager = new NodesListManager(this.rmContext);
|
||||
|
@ -175,10 +179,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
this.rmDispatcher.register(RMNodeEventType.class,
|
||||
new NodeEventDispatcher(this.rmContext));
|
||||
|
||||
//TODO change this to be random
|
||||
this.appTokenSecretManager.setMasterKey(ApplicationTokenSecretManager
|
||||
.createSecretKey("Dummy".getBytes()));
|
||||
|
||||
this.nmLivelinessMonitor = createNMLivelinessMonitor();
|
||||
addService(this.nmLivelinessMonitor);
|
||||
|
||||
|
@ -233,6 +233,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
}
|
||||
}
|
||||
|
||||
protected ApplicationTokenSecretManager createApplicationTokenSecretManager(
|
||||
Configuration conf) {
|
||||
return new ApplicationTokenSecretManager(conf);
|
||||
}
|
||||
|
||||
protected ResourceScheduler createScheduler() {
|
||||
return ReflectionUtils.newInstance(this.conf.getClass(
|
||||
YarnConfiguration.RM_SCHEDULER, FifoScheduler.class,
|
||||
|
@ -240,9 +245,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
}
|
||||
|
||||
protected ApplicationMasterLauncher createAMLauncher() {
|
||||
return new ApplicationMasterLauncher(
|
||||
this.appTokenSecretManager, this.clientToAMSecretManager,
|
||||
this.rmContext);
|
||||
return new ApplicationMasterLauncher(this.clientToAMSecretManager,
|
||||
this.rmContext);
|
||||
}
|
||||
|
||||
private NMLivelinessMonitor createNMLivelinessMonitor() {
|
||||
|
@ -273,6 +277,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
new LinkedBlockingQueue<SchedulerEvent>();
|
||||
private final Thread eventProcessor;
|
||||
private volatile boolean stopped = false;
|
||||
private boolean shouldExitOnError = false;
|
||||
|
||||
public SchedulerEventDispatcher(ResourceScheduler scheduler) {
|
||||
super(SchedulerEventDispatcher.class.getName());
|
||||
|
@ -281,6 +286,14 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
this.eventProcessor.setName("ResourceManager Event Processor");
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void init(Configuration conf) {
|
||||
this.shouldExitOnError =
|
||||
conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
|
||||
Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
|
||||
super.init(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void start() {
|
||||
this.eventProcessor.start();
|
||||
|
@ -306,8 +319,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
} catch (Throwable t) {
|
||||
LOG.fatal("Error in handling event type " + event.getType()
|
||||
+ " to the scheduler", t);
|
||||
if (getConfig().getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
|
||||
Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR)) {
|
||||
if (shouldExitOnError) {
|
||||
LOG.info("Exiting, bbye..");
|
||||
System.exit(-1);
|
||||
}
|
||||
|
@ -453,6 +465,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
throw new YarnException("Failed to login", ie);
|
||||
}
|
||||
|
||||
this.appTokenSecretManager.start();
|
||||
|
||||
startWepApp();
|
||||
DefaultMetricsSystem.initialize("ResourceManager");
|
||||
JvmMetrics.initSingleton("ResourceManager", null);
|
||||
|
@ -487,6 +501,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
}
|
||||
rmDTSecretManager.stopThreads();
|
||||
|
||||
this.appTokenSecretManager.stop();
|
||||
|
||||
/*synchronized(shutdown) {
|
||||
shutdown.set(true);
|
||||
shutdown.notifyAll();
|
||||
|
@ -524,8 +540,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
}
|
||||
|
||||
protected ApplicationMasterService createApplicationMasterService() {
|
||||
return new ApplicationMasterService(this.rmContext,
|
||||
this.appTokenSecretManager, scheduler);
|
||||
return new ApplicationMasterService(this.rmContext, scheduler);
|
||||
}
|
||||
|
||||
|
||||
|
@ -571,6 +586,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
return this.applicationACLsManager;
|
||||
}
|
||||
|
||||
@Private
|
||||
public ApplicationTokenSecretManager getApplicationTokenSecretManager(){
|
||||
return this.appTokenSecretManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recover(RMState state) throws Exception {
|
||||
resourceTracker.recover(state);
|
||||
|
|
|
@ -53,7 +53,6 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
|
|||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
|
||||
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
|
||||
|
@ -76,7 +75,6 @@ public class AMLauncher implements Runnable {
|
|||
private final Configuration conf;
|
||||
private final RecordFactory recordFactory =
|
||||
RecordFactoryProvider.getRecordFactory(null);
|
||||
private final ApplicationTokenSecretManager applicationTokenSecretManager;
|
||||
private final ClientToAMSecretManager clientToAMSecretManager;
|
||||
private final AMLauncherEventType eventType;
|
||||
private final RMContext rmContext;
|
||||
|
@ -86,11 +84,9 @@ public class AMLauncher implements Runnable {
|
|||
|
||||
public AMLauncher(RMContext rmContext, RMAppAttempt application,
|
||||
AMLauncherEventType eventType,
|
||||
ApplicationTokenSecretManager applicationTokenSecretManager,
|
||||
ClientToAMSecretManager clientToAMSecretManager, Configuration conf) {
|
||||
this.application = application;
|
||||
this.conf = conf;
|
||||
this.applicationTokenSecretManager = applicationTokenSecretManager;
|
||||
this.clientToAMSecretManager = clientToAMSecretManager;
|
||||
this.eventType = eventType;
|
||||
this.rmContext = rmContext;
|
||||
|
@ -129,6 +125,7 @@ public class AMLauncher implements Runnable {
|
|||
containerMgrProxy.stopContainer(stopRequest);
|
||||
}
|
||||
|
||||
// Protected. For tests.
|
||||
protected ContainerManager getContainerMgrProxy(
|
||||
final ContainerId containerId) {
|
||||
|
||||
|
@ -220,7 +217,7 @@ public class AMLauncher implements Runnable {
|
|||
application.getAppAttemptId());
|
||||
Token<ApplicationTokenIdentifier> token =
|
||||
new Token<ApplicationTokenIdentifier>(id,
|
||||
this.applicationTokenSecretManager);
|
||||
this.rmContext.getApplicationTokenSecretManager());
|
||||
String schedulerAddressStr =
|
||||
this.conf.get(YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS);
|
||||
|
|
|
@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
|
@ -42,20 +41,16 @@ public class ApplicationMasterLauncher extends AbstractService implements
|
|||
private final BlockingQueue<Runnable> masterEvents
|
||||
= new LinkedBlockingQueue<Runnable>();
|
||||
|
||||
protected ApplicationTokenSecretManager applicationTokenSecretManager;
|
||||
private ClientToAMSecretManager clientToAMSecretManager;
|
||||
protected final RMContext context;
|
||||
|
||||
public ApplicationMasterLauncher(
|
||||
ApplicationTokenSecretManager applicationTokenSecretManager,
|
||||
ClientToAMSecretManager clientToAMSecretManager,
|
||||
RMContext context) {
|
||||
ClientToAMSecretManager clientToAMSecretManager, RMContext context) {
|
||||
super(ApplicationMasterLauncher.class.getName());
|
||||
this.context = context;
|
||||
this.launcherPool = new ThreadPoolExecutor(10, 10, 1,
|
||||
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
|
||||
this.launcherHandlingThread = new LauncherThread();
|
||||
this.applicationTokenSecretManager = applicationTokenSecretManager;
|
||||
this.clientToAMSecretManager = clientToAMSecretManager;
|
||||
}
|
||||
|
||||
|
@ -66,8 +61,9 @@ public class ApplicationMasterLauncher extends AbstractService implements
|
|||
|
||||
protected Runnable createRunnableLauncher(RMAppAttempt application,
|
||||
AMLauncherEventType event) {
|
||||
Runnable launcher = new AMLauncher(context, application, event,
|
||||
applicationTokenSecretManager, clientToAMSecretManager, getConfig());
|
||||
Runnable launcher =
|
||||
new AMLauncher(context, application, event, clientToAMSecretManager,
|
||||
getConfig());
|
||||
return launcher;
|
||||
}
|
||||
|
||||
|
|
|
@ -33,13 +33,13 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.YarnException;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
|
|
|
@ -77,6 +77,7 @@ import org.apache.hadoop.yarn.state.StateMachine;
|
|||
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
public class RMAppAttemptImpl implements RMAppAttempt {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(RMAppAttemptImpl.class);
|
||||
|
@ -95,7 +96,6 @@ public class RMAppAttemptImpl implements RMAppAttempt {
|
|||
RMAppAttemptEvent> stateMachine;
|
||||
|
||||
private final RMContext rmContext;
|
||||
@SuppressWarnings("rawtypes")
|
||||
private final EventHandler eventHandler;
|
||||
private final YarnScheduler scheduler;
|
||||
private final ApplicationMasterService masterService;
|
||||
|
@ -539,7 +539,6 @@ public class RMAppAttemptImpl implements RMAppAttempt {
|
|||
}
|
||||
|
||||
private static final class AttemptStartedTransition extends BaseTransition {
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void transition(RMAppAttemptImpl appAttempt,
|
||||
RMAppAttemptEvent event) {
|
||||
|
@ -638,12 +637,13 @@ public class RMAppAttemptImpl implements RMAppAttempt {
|
|||
public void transition(RMAppAttemptImpl appAttempt,
|
||||
RMAppAttemptEvent event) {
|
||||
|
||||
ApplicationAttemptId appAttemptId = appAttempt.getAppAttemptId();
|
||||
|
||||
// Tell the AMS. Unregister from the ApplicationMasterService
|
||||
appAttempt.masterService
|
||||
.unregisterAttempt(appAttempt.applicationAttemptId);
|
||||
appAttempt.masterService.unregisterAttempt(appAttemptId);
|
||||
|
||||
// Tell the application and the scheduler
|
||||
ApplicationId applicationId = appAttempt.getAppAttemptId().getApplicationId();
|
||||
ApplicationId applicationId = appAttemptId.getApplicationId();
|
||||
RMAppEvent appEvent = null;
|
||||
switch (finalAttemptState) {
|
||||
case FINISHED:
|
||||
|
@ -676,8 +676,12 @@ public class RMAppAttemptImpl implements RMAppAttempt {
|
|||
}
|
||||
|
||||
appAttempt.eventHandler.handle(appEvent);
|
||||
appAttempt.eventHandler.handle(new AppRemovedSchedulerEvent(appAttempt
|
||||
.getAppAttemptId(), finalAttemptState));
|
||||
appAttempt.eventHandler.handle(new AppRemovedSchedulerEvent(appAttemptId,
|
||||
finalAttemptState));
|
||||
|
||||
// Remove the AppAttempt from the ApplicationTokenSecretManager
|
||||
appAttempt.rmContext.getApplicationTokenSecretManager()
|
||||
.applicationMasterFinished(appAttemptId);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,155 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.security;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
|
||||
import javax.crypto.SecretKey;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.token.SecretManager;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
|
||||
|
||||
/**
|
||||
* Application-tokens are per ApplicationAttempt. If users redistribute their
|
||||
* tokens, it is their headache, god save them. I mean you are not supposed to
|
||||
* distribute keys to your vault, right? Anyways, ResourceManager saves each
|
||||
* token locally in memory till application finishes and to a store for restart,
|
||||
* so no need to remember master-keys even after rolling them.
|
||||
*/
|
||||
public class ApplicationTokenSecretManager extends
|
||||
SecretManager<ApplicationTokenIdentifier> {
|
||||
|
||||
private static final Log LOG = LogFactory
|
||||
.getLog(ApplicationTokenSecretManager.class);
|
||||
|
||||
private SecretKey masterKey;
|
||||
private final Timer timer;
|
||||
private final long rollingInterval;
|
||||
|
||||
private final Map<ApplicationAttemptId, byte[]> passwords =
|
||||
new HashMap<ApplicationAttemptId, byte[]>();
|
||||
|
||||
/**
|
||||
* Create an {@link ApplicationTokenSecretManager}
|
||||
*/
|
||||
public ApplicationTokenSecretManager(Configuration conf) {
|
||||
rollMasterKey();
|
||||
this.timer = new Timer();
|
||||
this.rollingInterval =
|
||||
conf
|
||||
.getLong(
|
||||
YarnConfiguration.RM_APP_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
|
||||
YarnConfiguration.DEFAULT_RM_APP_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS) * 1000;
|
||||
}
|
||||
|
||||
public void start() {
|
||||
this.timer.scheduleAtFixedRate(new MasterKeyRoller(), 0, rollingInterval);
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
this.timer.cancel();
|
||||
}
|
||||
|
||||
public synchronized void applicationMasterFinished(
|
||||
ApplicationAttemptId appAttemptId) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Application finished, removing password for " + appAttemptId);
|
||||
}
|
||||
this.passwords.remove(appAttemptId);
|
||||
}
|
||||
|
||||
private class MasterKeyRoller extends TimerTask {
|
||||
@Override
|
||||
public void run() {
|
||||
rollMasterKey();
|
||||
}
|
||||
}
|
||||
|
||||
@Private
|
||||
public synchronized void setMasterKey(SecretKey masterKey) {
|
||||
this.masterKey = masterKey;
|
||||
}
|
||||
|
||||
@Private
|
||||
public synchronized SecretKey getMasterKey() {
|
||||
return this.masterKey;
|
||||
}
|
||||
|
||||
@Private
|
||||
synchronized void rollMasterKey() {
|
||||
LOG.info("Rolling master-key for application-tokens");
|
||||
this.masterKey = generateSecret();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a password for a given {@link ApplicationTokenIdentifier}. Used to
|
||||
* send to the AppicationAttempt which can give it back during authentication.
|
||||
*/
|
||||
@Override
|
||||
public synchronized byte[] createPassword(
|
||||
ApplicationTokenIdentifier identifier) {
|
||||
ApplicationAttemptId applicationAttemptId =
|
||||
identifier.getApplicationAttemptId();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Creating password for " + applicationAttemptId);
|
||||
}
|
||||
byte[] password = createPassword(identifier.getBytes(), masterKey);
|
||||
this.passwords.put(applicationAttemptId, password);
|
||||
return password;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve the password for the given {@link ApplicationTokenIdentifier}.
|
||||
* Used by RPC layer to validate a remote {@link ApplicationTokenIdentifier}.
|
||||
*/
|
||||
@Override
|
||||
public synchronized byte[] retrievePassword(
|
||||
ApplicationTokenIdentifier identifier) throws InvalidToken {
|
||||
ApplicationAttemptId applicationAttemptId =
|
||||
identifier.getApplicationAttemptId();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Trying to retrieve password for " + applicationAttemptId);
|
||||
}
|
||||
byte[] password = this.passwords.get(applicationAttemptId);
|
||||
if (password == null) {
|
||||
throw new InvalidToken("Password not found for ApplicationAttempt "
|
||||
+ applicationAttemptId);
|
||||
}
|
||||
return password;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an empty TokenId to be used for de-serializing an
|
||||
* {@link ApplicationTokenIdentifier} by the RPC layer.
|
||||
*/
|
||||
@Override
|
||||
public ApplicationTokenIdentifier createIdentifier() {
|
||||
return new ApplicationTokenIdentifier();
|
||||
}
|
||||
|
||||
}
|
|
@ -24,9 +24,9 @@ import junit.framework.Assert;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
|
@ -35,7 +35,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
|
||||
|
@ -55,6 +54,7 @@ import org.apache.log4j.Level;
|
|||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public class MockRM extends ResourceManager {
|
||||
|
||||
public MockRM() {
|
||||
|
@ -224,8 +224,7 @@ public class MockRM extends ResourceManager {
|
|||
|
||||
@Override
|
||||
protected ApplicationMasterService createApplicationMasterService() {
|
||||
return new ApplicationMasterService(getRMContext(),
|
||||
this.appTokenSecretManager, scheduler) {
|
||||
return new ApplicationMasterService(getRMContext(), scheduler) {
|
||||
@Override
|
||||
public void start() {
|
||||
// override to not start rpc handler
|
||||
|
@ -240,8 +239,8 @@ public class MockRM extends ResourceManager {
|
|||
|
||||
@Override
|
||||
protected ApplicationMasterLauncher createAMLauncher() {
|
||||
return new ApplicationMasterLauncher(this.appTokenSecretManager,
|
||||
this.clientToAMSecretManager, getRMContext()) {
|
||||
return new ApplicationMasterLauncher(this.clientToAMSecretManager,
|
||||
getRMContext()) {
|
||||
@Override
|
||||
public void start() {
|
||||
// override to not start rpc handler
|
||||
|
|
|
@ -60,9 +60,9 @@ public class TestAMAuthorization {
|
|||
|
||||
private static final Log LOG = LogFactory.getLog(TestAMAuthorization.class);
|
||||
|
||||
private static final class MyContainerManager implements ContainerManager {
|
||||
public static final class MyContainerManager implements ContainerManager {
|
||||
|
||||
Map<String, String> containerEnv;
|
||||
public Map<String, String> amContainerEnv;
|
||||
|
||||
public MyContainerManager() {
|
||||
}
|
||||
|
@ -71,7 +71,7 @@ public class TestAMAuthorization {
|
|||
public StartContainerResponse
|
||||
startContainer(StartContainerRequest request)
|
||||
throws YarnRemoteException {
|
||||
containerEnv = request.getContainerLaunchContext().getEnvironment();
|
||||
amContainerEnv = request.getContainerLaunchContext().getEnvironment();
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -90,19 +90,15 @@ public class TestAMAuthorization {
|
|||
}
|
||||
}
|
||||
|
||||
private static class MockRMWithAMS extends MockRMWithCustomAMLauncher {
|
||||
public static class MockRMWithAMS extends MockRMWithCustomAMLauncher {
|
||||
|
||||
private static final Configuration conf = new Configuration();
|
||||
static {
|
||||
public MockRMWithAMS(Configuration conf, ContainerManager containerManager) {
|
||||
super(conf, containerManager);
|
||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||
"kerberos");
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
}
|
||||
|
||||
public MockRMWithAMS(ContainerManager containerManager) {
|
||||
super(conf, containerManager);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doSecureLogin() throws IOException {
|
||||
// Skip the login.
|
||||
|
@ -111,15 +107,14 @@ public class TestAMAuthorization {
|
|||
@Override
|
||||
protected ApplicationMasterService createApplicationMasterService() {
|
||||
|
||||
return new ApplicationMasterService(getRMContext(),
|
||||
this.appTokenSecretManager, this.scheduler);
|
||||
return new ApplicationMasterService(getRMContext(), this.scheduler);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAuthorizedAccess() throws Exception {
|
||||
MyContainerManager containerManager = new MyContainerManager();
|
||||
MockRM rm = new MockRMWithAMS(containerManager);
|
||||
final MockRM rm = new MockRMWithAMS(new Configuration(), containerManager);
|
||||
rm.start();
|
||||
|
||||
MockNM nm1 = rm.registerNode("localhost:1234", 5120);
|
||||
|
@ -132,11 +127,11 @@ public class TestAMAuthorization {
|
|||
nm1.nodeHeartbeat(true);
|
||||
|
||||
int waitCount = 0;
|
||||
while (containerManager.containerEnv == null && waitCount++ < 20) {
|
||||
while (containerManager.amContainerEnv == null && waitCount++ < 20) {
|
||||
LOG.info("Waiting for AM Launch to happen..");
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
Assert.assertNotNull(containerManager.containerEnv);
|
||||
Assert.assertNotNull(containerManager.amContainerEnv);
|
||||
|
||||
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
||||
ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
|
||||
|
@ -145,13 +140,10 @@ public class TestAMAuthorization {
|
|||
// Create a client to the RM.
|
||||
final Configuration conf = rm.getConfig();
|
||||
final YarnRPC rpc = YarnRPC.create(conf);
|
||||
final String serviceAddr = conf.get(
|
||||
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS);
|
||||
|
||||
UserGroupInformation currentUser = UserGroupInformation
|
||||
.createRemoteUser(applicationAttemptId.toString());
|
||||
String tokenURLEncodedStr = containerManager.containerEnv
|
||||
String tokenURLEncodedStr = containerManager.amContainerEnv
|
||||
.get(ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
|
||||
LOG.info("AppMasterToken is " + tokenURLEncodedStr);
|
||||
Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
|
||||
|
@ -162,8 +154,8 @@ public class TestAMAuthorization {
|
|||
.doAs(new PrivilegedAction<AMRMProtocol>() {
|
||||
@Override
|
||||
public AMRMProtocol run() {
|
||||
return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, NetUtils
|
||||
.createSocketAddr(serviceAddr), conf);
|
||||
return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rm
|
||||
.getApplicationMasterService().getBindAddress(), conf);
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -181,7 +173,7 @@ public class TestAMAuthorization {
|
|||
@Test
|
||||
public void testUnauthorizedAccess() throws Exception {
|
||||
MyContainerManager containerManager = new MyContainerManager();
|
||||
MockRM rm = new MockRMWithAMS(containerManager);
|
||||
MockRM rm = new MockRMWithAMS(new Configuration(), containerManager);
|
||||
rm.start();
|
||||
|
||||
MockNM nm1 = rm.registerNode("localhost:1234", 5120);
|
||||
|
@ -191,11 +183,11 @@ public class TestAMAuthorization {
|
|||
nm1.nodeHeartbeat(true);
|
||||
|
||||
int waitCount = 0;
|
||||
while (containerManager.containerEnv == null && waitCount++ < 20) {
|
||||
while (containerManager.amContainerEnv == null && waitCount++ < 20) {
|
||||
LOG.info("Waiting for AM Launch to happen..");
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
Assert.assertNotNull(containerManager.containerEnv);
|
||||
Assert.assertNotNull(containerManager.amContainerEnv);
|
||||
|
||||
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
||||
ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
|
||||
|
@ -210,7 +202,7 @@ public class TestAMAuthorization {
|
|||
|
||||
UserGroupInformation currentUser = UserGroupInformation
|
||||
.createRemoteUser(applicationAttemptId.toString());
|
||||
String tokenURLEncodedStr = containerManager.containerEnv
|
||||
String tokenURLEncodedStr = containerManager.amContainerEnv
|
||||
.get(ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
|
||||
LOG.info("AppMasterToken is " + tokenURLEncodedStr);
|
||||
Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
|
||||
|
|
|
@ -27,8 +27,8 @@ import junit.framework.Assert;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.MockApps;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
@ -37,7 +37,6 @@ import org.apache.hadoop.yarn.event.Dispatcher;
|
|||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
|
||||
|
@ -93,7 +92,7 @@ public class TestAppManager{
|
|||
AMLivelinessMonitor amLivelinessMonitor = new AMLivelinessMonitor(
|
||||
rmDispatcher);
|
||||
return new RMContextImpl(new MemStore(), rmDispatcher,
|
||||
containerAllocationExpirer, amLivelinessMonitor, null) {
|
||||
containerAllocationExpirer, amLivelinessMonitor, null, null) {
|
||||
@Override
|
||||
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
|
||||
return map;
|
||||
|
@ -336,9 +335,9 @@ public class TestAppManager{
|
|||
|
||||
RMContext rmContext = mockRMContext(0, now - 10);
|
||||
ResourceScheduler scheduler = new CapacityScheduler();
|
||||
ApplicationMasterService masterService = new ApplicationMasterService(rmContext,
|
||||
new ApplicationTokenSecretManager(), scheduler);
|
||||
Configuration conf = new Configuration();
|
||||
ApplicationMasterService masterService =
|
||||
new ApplicationMasterService(rmContext, scheduler);
|
||||
TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
|
||||
new ClientToAMSecretManager(), scheduler, masterService,
|
||||
new ApplicationACLsManager(conf), conf);
|
||||
|
@ -384,9 +383,9 @@ public class TestAppManager{
|
|||
|
||||
RMContext rmContext = mockRMContext(1, now - 10);
|
||||
ResourceScheduler scheduler = new CapacityScheduler();
|
||||
ApplicationMasterService masterService = new ApplicationMasterService(rmContext,
|
||||
new ApplicationTokenSecretManager(), scheduler);
|
||||
Configuration conf = new Configuration();
|
||||
ApplicationMasterService masterService =
|
||||
new ApplicationMasterService(rmContext, scheduler);
|
||||
TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
|
||||
new ClientToAMSecretManager(), scheduler, masterService,
|
||||
new ApplicationACLsManager(conf), conf);
|
||||
|
@ -432,9 +431,9 @@ public class TestAppManager{
|
|||
// specify 1 here and use same appId below so it gets duplicate entry
|
||||
RMContext rmContext = mockRMContext(1, now - 10);
|
||||
ResourceScheduler scheduler = new CapacityScheduler();
|
||||
ApplicationMasterService masterService = new ApplicationMasterService(rmContext,
|
||||
new ApplicationTokenSecretManager(), scheduler);
|
||||
Configuration conf = new Configuration();
|
||||
ApplicationMasterService masterService =
|
||||
new ApplicationMasterService(rmContext, scheduler);
|
||||
TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
|
||||
new ClientToAMSecretManager(), scheduler, masterService,
|
||||
new ApplicationACLsManager(conf), conf);
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -33,7 +32,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher;
|
||||
|
@ -123,14 +121,13 @@ public class TestApplicationMasterLauncher {
|
|||
|
||||
@Override
|
||||
protected ApplicationMasterLauncher createAMLauncher() {
|
||||
return new ApplicationMasterLauncher(super.appTokenSecretManager,
|
||||
super.clientToAMSecretManager, getRMContext()) {
|
||||
return new ApplicationMasterLauncher(super.clientToAMSecretManager,
|
||||
getRMContext()) {
|
||||
@Override
|
||||
protected Runnable createRunnableLauncher(RMAppAttempt application,
|
||||
AMLauncherEventType event) {
|
||||
return new AMLauncher(context, application, event,
|
||||
applicationTokenSecretManager, clientToAMSecretManager,
|
||||
getConfig()) {
|
||||
clientToAMSecretManager, getConfig()) {
|
||||
@Override
|
||||
protected ContainerManager getContainerMgrProxy(
|
||||
ContainerId containerId) {
|
||||
|
|
|
@ -78,7 +78,7 @@ public class TestRMNodeTransitions {
|
|||
|
||||
rmContext =
|
||||
new RMContextImpl(new MemStore(), rmDispatcher, null, null,
|
||||
mock(DelegationTokenRenewer.class));
|
||||
mock(DelegationTokenRenewer.class), null);
|
||||
scheduler = mock(YarnScheduler.class);
|
||||
doAnswer(
|
||||
new Answer<Void>() {
|
||||
|
|
|
@ -44,7 +44,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
||||
|
@ -55,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
|
|
@ -48,7 +48,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
||||
|
@ -62,6 +61,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
|
|
@ -73,7 +73,7 @@ public class TestNMExpiry {
|
|||
// Dispatcher that processes events inline
|
||||
Dispatcher dispatcher = new InlineDispatcher();
|
||||
RMContext context = new RMContextImpl(new MemStore(), dispatcher, null,
|
||||
null, null);
|
||||
null, null, null);
|
||||
dispatcher.register(SchedulerEventType.class,
|
||||
new InlineDispatcher.EmptyEventHandler());
|
||||
dispatcher.register(RMNodeEventType.class,
|
||||
|
|
|
@ -67,7 +67,7 @@ public class TestRMNMRPCResponseId {
|
|||
}
|
||||
});
|
||||
RMContext context =
|
||||
new RMContextImpl(new MemStore(), dispatcher, null, null, null);
|
||||
new RMContextImpl(new MemStore(), dispatcher, null, null, null, null);
|
||||
dispatcher.register(RMNodeEventType.class,
|
||||
new ResourceManager.NodeEventDispatcher(context));
|
||||
NodesListManager nodesListManager = new NodesListManager(context);
|
||||
|
|
|
@ -34,7 +34,6 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
||||
|
@ -48,7 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -118,8 +117,10 @@ public class TestRMAppTransitions {
|
|||
ContainerAllocationExpirer containerAllocationExpirer =
|
||||
mock(ContainerAllocationExpirer.class);
|
||||
AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class);
|
||||
this.rmContext = new RMContextImpl(new MemStore(), rmDispatcher,
|
||||
containerAllocationExpirer, amLivelinessMonitor, null);
|
||||
this.rmContext =
|
||||
new RMContextImpl(new MemStore(), rmDispatcher,
|
||||
containerAllocationExpirer, amLivelinessMonitor, null,
|
||||
new ApplicationTokenSecretManager(conf));
|
||||
|
||||
rmDispatcher.register(RMAppAttemptEventType.class,
|
||||
new TestApplicationAttemptEventDispatcher(this.rmContext));
|
||||
|
@ -142,9 +143,8 @@ public class TestRMAppTransitions {
|
|||
String clientTokenStr = "bogusstring";
|
||||
ApplicationStore appStore = mock(ApplicationStore.class);
|
||||
YarnScheduler scheduler = mock(YarnScheduler.class);
|
||||
ApplicationMasterService masterService =
|
||||
new ApplicationMasterService(rmContext,
|
||||
new ApplicationTokenSecretManager(), scheduler);
|
||||
ApplicationMasterService masterService =
|
||||
new ApplicationMasterService(rmContext, scheduler);
|
||||
|
||||
RMApp application = new RMAppImpl(applicationId, rmContext,
|
||||
conf, name, user,
|
||||
|
|
|
@ -17,9 +17,15 @@
|
|||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Matchers.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
@ -61,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -136,8 +143,10 @@ public class TestRMAppAttemptTransitions {
|
|||
ContainerAllocationExpirer containerAllocationExpirer =
|
||||
mock(ContainerAllocationExpirer.class);
|
||||
AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class);
|
||||
rmContext = new RMContextImpl(new MemStore(), rmDispatcher,
|
||||
containerAllocationExpirer, amLivelinessMonitor, null);
|
||||
rmContext =
|
||||
new RMContextImpl(new MemStore(), rmDispatcher,
|
||||
containerAllocationExpirer, amLivelinessMonitor, null,
|
||||
new ApplicationTokenSecretManager(new Configuration()));
|
||||
|
||||
scheduler = mock(YarnScheduler.class);
|
||||
masterService = mock(ApplicationMasterService.class);
|
||||
|
|
|
@ -18,10 +18,14 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
import static org.mockito.Mockito.*;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
|
@ -42,6 +46,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAlloca
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
|
||||
|
||||
public class TestUtils {
|
||||
private static final Log LOG = LogFactory.getLog(TestUtils.class);
|
||||
|
@ -74,8 +79,9 @@ public class TestUtils {
|
|||
ContainerAllocationExpirer cae =
|
||||
new ContainerAllocationExpirer(nullDispatcher);
|
||||
|
||||
RMContext rmContext =
|
||||
new RMContextImpl(null, nullDispatcher, cae, null, null);
|
||||
RMContext rmContext =
|
||||
new RMContextImpl(null, nullDispatcher, cae, null, null,
|
||||
new ApplicationTokenSecretManager(new Configuration()));
|
||||
|
||||
return rmContext;
|
||||
}
|
||||
|
|
|
@ -85,7 +85,8 @@ public class TestFifoScheduler {
|
|||
@Test
|
||||
public void testAppAttemptMetrics() throws Exception {
|
||||
AsyncDispatcher dispatcher = new InlineDispatcher();
|
||||
RMContext rmContext = new RMContextImpl(null, dispatcher, null, null, null);
|
||||
RMContext rmContext =
|
||||
new RMContextImpl(null, dispatcher, null, null, null, null);
|
||||
|
||||
FifoScheduler schedular = new FifoScheduler();
|
||||
schedular.reinitialize(new Configuration(), null, rmContext);
|
||||
|
|
|
@ -0,0 +1,234 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.security;
|
||||
|
||||
import java.security.PrivilegedAction;
|
||||
|
||||
import javax.crypto.SecretKey;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.yarn.api.AMRMProtocol;
|
||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestApplicationTokens {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestApplicationTokens.class);
|
||||
|
||||
/**
|
||||
* Validate that application tokens are unusable after the
|
||||
* application-finishes.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testTokenExpiry() throws Exception {
|
||||
|
||||
MyContainerManager containerManager = new MyContainerManager();
|
||||
final MockRM rm = new MockRMWithAMS(new Configuration(), containerManager);
|
||||
rm.start();
|
||||
|
||||
try {
|
||||
MockNM nm1 = rm.registerNode("localhost:1234", 5120);
|
||||
|
||||
RMApp app = rm.submitApp(1024);
|
||||
|
||||
nm1.nodeHeartbeat(true);
|
||||
|
||||
int waitCount = 0;
|
||||
while (containerManager.amContainerEnv == null && waitCount++ < 20) {
|
||||
LOG.info("Waiting for AM Launch to happen..");
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
Assert.assertNotNull(containerManager.amContainerEnv);
|
||||
|
||||
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
||||
ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
|
||||
|
||||
// Create a client to the RM.
|
||||
final Configuration conf = rm.getConfig();
|
||||
final YarnRPC rpc = YarnRPC.create(conf);
|
||||
|
||||
UserGroupInformation currentUser =
|
||||
UserGroupInformation
|
||||
.createRemoteUser(applicationAttemptId.toString());
|
||||
String tokenURLEncodedStr =
|
||||
containerManager.amContainerEnv
|
||||
.get(ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
|
||||
LOG.info("AppMasterToken is " + tokenURLEncodedStr);
|
||||
Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
|
||||
token.decodeFromUrlString(tokenURLEncodedStr);
|
||||
currentUser.addToken(token);
|
||||
|
||||
AMRMProtocol rmClient = createRMClient(rm, conf, rpc, currentUser);
|
||||
|
||||
RegisterApplicationMasterRequest request =
|
||||
Records.newRecord(RegisterApplicationMasterRequest.class);
|
||||
request.setApplicationAttemptId(applicationAttemptId);
|
||||
rmClient.registerApplicationMaster(request);
|
||||
|
||||
FinishApplicationMasterRequest finishAMRequest =
|
||||
Records.newRecord(FinishApplicationMasterRequest.class);
|
||||
finishAMRequest.setAppAttemptId(applicationAttemptId);
|
||||
finishAMRequest
|
||||
.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
|
||||
finishAMRequest.setDiagnostics("diagnostics");
|
||||
finishAMRequest.setTrackingUrl("url");
|
||||
rmClient.finishApplicationMaster(finishAMRequest);
|
||||
|
||||
// Now simulate trying to allocate. RPC call itself should throw auth
|
||||
// exception.
|
||||
rpc.stopProxy(rmClient, conf); // To avoid using cached client
|
||||
rmClient = createRMClient(rm, conf, rpc, currentUser);
|
||||
request.setApplicationAttemptId(BuilderUtils.newApplicationAttemptId(
|
||||
BuilderUtils.newApplicationId(12345, 78), 987));
|
||||
AllocateRequest allocateRequest =
|
||||
Records.newRecord(AllocateRequest.class);
|
||||
allocateRequest.setApplicationAttemptId(applicationAttemptId);
|
||||
try {
|
||||
rmClient.allocate(allocateRequest);
|
||||
Assert.fail("You got to be kidding me! "
|
||||
+ "Using App tokens after app-finish should fail!");
|
||||
} catch (Throwable t) {
|
||||
LOG.info("Exception found is ", t);
|
||||
// The exception will still have the earlier appAttemptId as it picks it
|
||||
// up from the token.
|
||||
Assert.assertTrue(t.getCause().getMessage().contains(
|
||||
"Password not found for ApplicationAttempt " +
|
||||
applicationAttemptId.toString()));
|
||||
}
|
||||
|
||||
} finally {
|
||||
rm.stop();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate master-key-roll-over and that tokens are usable even after
|
||||
* master-key-roll-over.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testMasterKeyRollOver() throws Exception {
|
||||
|
||||
Configuration config = new Configuration();
|
||||
MyContainerManager containerManager = new MyContainerManager();
|
||||
final MockRM rm = new MockRMWithAMS(config, containerManager);
|
||||
rm.start();
|
||||
|
||||
try {
|
||||
MockNM nm1 = rm.registerNode("localhost:1234", 5120);
|
||||
|
||||
RMApp app = rm.submitApp(1024);
|
||||
|
||||
nm1.nodeHeartbeat(true);
|
||||
|
||||
int waitCount = 0;
|
||||
while (containerManager.amContainerEnv == null && waitCount++ < 20) {
|
||||
LOG.info("Waiting for AM Launch to happen..");
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
Assert.assertNotNull(containerManager.amContainerEnv);
|
||||
|
||||
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
||||
ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
|
||||
|
||||
// Create a client to the RM.
|
||||
final Configuration conf = rm.getConfig();
|
||||
final YarnRPC rpc = YarnRPC.create(conf);
|
||||
|
||||
UserGroupInformation currentUser =
|
||||
UserGroupInformation
|
||||
.createRemoteUser(applicationAttemptId.toString());
|
||||
String tokenURLEncodedStr =
|
||||
containerManager.amContainerEnv
|
||||
.get(ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
|
||||
LOG.info("AppMasterToken is " + tokenURLEncodedStr);
|
||||
Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
|
||||
token.decodeFromUrlString(tokenURLEncodedStr);
|
||||
currentUser.addToken(token);
|
||||
|
||||
AMRMProtocol rmClient = createRMClient(rm, conf, rpc, currentUser);
|
||||
|
||||
RegisterApplicationMasterRequest request =
|
||||
Records.newRecord(RegisterApplicationMasterRequest.class);
|
||||
request.setApplicationAttemptId(applicationAttemptId);
|
||||
rmClient.registerApplicationMaster(request);
|
||||
|
||||
// One allocate call.
|
||||
AllocateRequest allocateRequest =
|
||||
Records.newRecord(AllocateRequest.class);
|
||||
allocateRequest.setApplicationAttemptId(applicationAttemptId);
|
||||
Assert.assertFalse(rmClient.allocate(allocateRequest).getAMResponse()
|
||||
.getReboot());
|
||||
|
||||
// Simulate a master-key-roll-over
|
||||
ApplicationTokenSecretManager appTokenSecretManager =
|
||||
rm.getRMContext().getApplicationTokenSecretManager();
|
||||
SecretKey oldKey = appTokenSecretManager.getMasterKey();
|
||||
appTokenSecretManager.rollMasterKey();
|
||||
SecretKey newKey = appTokenSecretManager.getMasterKey();
|
||||
Assert.assertFalse("Master key should have changed!",
|
||||
oldKey.equals(newKey));
|
||||
|
||||
// Another allocate call. Should continue to work.
|
||||
rpc.stopProxy(rmClient, conf); // To avoid using cached client
|
||||
rmClient = createRMClient(rm, conf, rpc, currentUser);
|
||||
allocateRequest = Records.newRecord(AllocateRequest.class);
|
||||
allocateRequest.setApplicationAttemptId(applicationAttemptId);
|
||||
Assert.assertFalse(rmClient.allocate(allocateRequest).getAMResponse()
|
||||
.getReboot());
|
||||
} finally {
|
||||
rm.stop();
|
||||
}
|
||||
}
|
||||
|
||||
private AMRMProtocol createRMClient(final MockRM rm,
|
||||
final Configuration conf, final YarnRPC rpc,
|
||||
UserGroupInformation currentUser) {
|
||||
return currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
|
||||
@Override
|
||||
public AMRMProtocol run() {
|
||||
return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rm
|
||||
.getApplicationMasterService().getBindAddress(), conf);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
|
@ -152,7 +152,7 @@ public class TestRMWebApp {
|
|||
for (RMNode node : deactivatedNodes) {
|
||||
deactivatedNodesMap.put(node.getHostName(), node);
|
||||
}
|
||||
return new RMContextImpl(new MemStore(), null, null, null, null) {
|
||||
return new RMContextImpl(new MemStore(), null, null, null, null, null) {
|
||||
@Override
|
||||
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
|
||||
return applicationsMaps;
|
||||
|
|
|
@ -78,12 +78,12 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
|
|||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
@ -387,20 +387,19 @@ public class TestContainerManagerSecurity {
|
|||
appAttempt.getAppAttemptId().toString());
|
||||
|
||||
// Ask for a container from the RM
|
||||
String schedulerAddressString = conf.get(
|
||||
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS);
|
||||
final InetSocketAddress schedulerAddr = NetUtils
|
||||
.createSocketAddr(schedulerAddressString);
|
||||
final InetSocketAddress schedulerAddr =
|
||||
resourceManager.getApplicationMasterService().getBindAddress();
|
||||
ApplicationTokenIdentifier appTokenIdentifier = new ApplicationTokenIdentifier(
|
||||
appAttempt.getAppAttemptId());
|
||||
ApplicationTokenSecretManager appTokenSecretManager = new ApplicationTokenSecretManager();
|
||||
appTokenSecretManager.setMasterKey(ApplicationTokenSecretManager
|
||||
.createSecretKey("Dummy".getBytes())); // TODO: FIX. Be in Sync with
|
||||
// ResourceManager.java
|
||||
Token<ApplicationTokenIdentifier> appToken = new Token<ApplicationTokenIdentifier>(
|
||||
appTokenIdentifier, appTokenSecretManager);
|
||||
appToken.setService(new Text(schedulerAddressString));
|
||||
ApplicationTokenSecretManager appTokenSecretManager =
|
||||
new ApplicationTokenSecretManager(conf);
|
||||
appTokenSecretManager.setMasterKey(resourceManager
|
||||
.getApplicationTokenSecretManager().getMasterKey());
|
||||
Token<ApplicationTokenIdentifier> appToken =
|
||||
new Token<ApplicationTokenIdentifier>(appTokenIdentifier,
|
||||
appTokenSecretManager);
|
||||
appToken.setService(new Text(schedulerAddr.getHostName() + ":"
|
||||
+ schedulerAddr.getPort()));
|
||||
currentUser.addToken(appToken);
|
||||
|
||||
AMRMProtocol scheduler = currentUser
|
||||
|
|
Loading…
Reference in New Issue