MAPREDUCE-3256. Added authorization checks for the protocol between NodeManager and ApplicationMaster. Contributed by Vinod K V.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1194850 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2dcd43c7b1
commit
7f4dc27757
|
@ -1807,7 +1807,7 @@ Release 0.23.0 - Unreleased
|
|||
etc. (Jonathan Eagles via acmurthy)
|
||||
|
||||
MAPREDUCE-3257. Added authorization checks for the protocol between
|
||||
ResourceManager and ApplicatoinMaster. (vinodkv via acmurthy)
|
||||
ResourceManager and ApplicationMaster. (vinodkv via acmurthy)
|
||||
|
||||
MAPREDUCE-3259. Added java.library.path of NodeManager to
|
||||
ContainerLocalizer in LinuxContainerExecutor. (Kihwal Lee via acmurthy)
|
||||
|
@ -1859,6 +1859,9 @@ Release 0.23.0 - Unreleased
|
|||
MAPREDUCE-3306. Fixed a bug in NodeManager ApplicationImpl that was causing
|
||||
NodeManager to crash. (vinodkv)
|
||||
|
||||
MAPREDUCE-3256. Added authorization checks for the protocol between
|
||||
NodeManager and ApplicationMaster. (vinodkv via acmurthy)
|
||||
|
||||
Release 0.22.0 - Unreleased
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -21,11 +21,13 @@ package org.apache.hadoop.mapreduce.v2.app.launcher;
|
|||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
@ -83,10 +85,14 @@ public class ContainerLauncherImpl extends AbstractService implements
|
|||
private BlockingQueue<ContainerLauncherEvent> eventQueue =
|
||||
new LinkedBlockingQueue<ContainerLauncherEvent>();
|
||||
private RecordFactory recordFactory;
|
||||
//have a cache/map of UGIs so as to avoid creating too many RPC
|
||||
//client connection objects to the same NodeManager
|
||||
private ConcurrentMap<String, UserGroupInformation> ugiMap =
|
||||
new ConcurrentHashMap<String, UserGroupInformation>();
|
||||
|
||||
// To track numNodes.
|
||||
Set<String> allNodes = new HashSet<String>();
|
||||
|
||||
// have a cache/map of proxies so as to avoid creating multiple RPC
|
||||
// client connection objects for the same container.
|
||||
private Map<ContainerId, ContainerManager> clientCache
|
||||
= new HashMap<ContainerId, ContainerManager>();
|
||||
|
||||
public ContainerLauncherImpl(AppContext context) {
|
||||
super(ContainerLauncherImpl.class.getName());
|
||||
|
@ -134,7 +140,7 @@ public class ContainerLauncherImpl extends AbstractService implements
|
|||
|
||||
// nodes where containers will run at *this* point of time. This is
|
||||
// *not* the cluster size and doesn't need to be.
|
||||
int numNodes = ugiMap.size();
|
||||
int numNodes = allNodes.size();
|
||||
int idealPoolSize = Math.min(limitOnPoolSize, numNodes);
|
||||
|
||||
if (poolSize <= idealPoolSize) {
|
||||
|
@ -142,7 +148,8 @@ public class ContainerLauncherImpl extends AbstractService implements
|
|||
// later is just a buffer so we are not always increasing the
|
||||
// pool-size
|
||||
int newPoolSize = idealPoolSize + INITIAL_POOL_SIZE;
|
||||
LOG.debug("Setting pool size to " + newPoolSize);
|
||||
LOG.info("Setting ContainerLauncher pool size to "
|
||||
+ newPoolSize);
|
||||
launcherPool.setCorePoolSize(newPoolSize);
|
||||
}
|
||||
}
|
||||
|
@ -167,37 +174,43 @@ public class ContainerLauncherImpl extends AbstractService implements
|
|||
super.stop();
|
||||
}
|
||||
|
||||
protected ContainerManager getCMProxy(
|
||||
protected ContainerManager getCMProxy(ContainerId containerID,
|
||||
final String containerManagerBindAddr, ContainerToken containerToken)
|
||||
throws IOException {
|
||||
|
||||
UserGroupInformation user = UserGroupInformation.getCurrentUser();
|
||||
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
synchronized (this.clientCache) {
|
||||
|
||||
Token<ContainerTokenIdentifier> token = new Token<ContainerTokenIdentifier>(
|
||||
containerToken.getIdentifier().array(), containerToken
|
||||
.getPassword().array(), new Text(containerToken.getKind()),
|
||||
new Text(containerToken.getService()));
|
||||
// the user in createRemoteUser in this context is not important
|
||||
UserGroupInformation ugi = UserGroupInformation
|
||||
.createRemoteUser(containerManagerBindAddr);
|
||||
ugi.addToken(token);
|
||||
ugiMap.putIfAbsent(containerManagerBindAddr, ugi);
|
||||
if (this.clientCache.containsKey(containerID)) {
|
||||
return this.clientCache.get(containerID);
|
||||
}
|
||||
|
||||
user = ugiMap.get(containerManagerBindAddr);
|
||||
this.allNodes.add(containerManagerBindAddr);
|
||||
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
Token<ContainerTokenIdentifier> token = new Token<ContainerTokenIdentifier>(
|
||||
containerToken.getIdentifier().array(), containerToken
|
||||
.getPassword().array(), new Text(containerToken.getKind()),
|
||||
new Text(containerToken.getService()));
|
||||
// the user in createRemoteUser in this context has to be ContainerID
|
||||
user = UserGroupInformation.createRemoteUser(containerID.toString());
|
||||
user.addToken(token);
|
||||
}
|
||||
|
||||
ContainerManager proxy = user
|
||||
.doAs(new PrivilegedAction<ContainerManager>() {
|
||||
@Override
|
||||
public ContainerManager run() {
|
||||
YarnRPC rpc = YarnRPC.create(getConfig());
|
||||
return (ContainerManager) rpc.getProxy(ContainerManager.class,
|
||||
NetUtils.createSocketAddr(containerManagerBindAddr),
|
||||
getConfig());
|
||||
}
|
||||
});
|
||||
this.clientCache.put(containerID, proxy);
|
||||
return proxy;
|
||||
}
|
||||
ContainerManager proxy =
|
||||
user.doAs(new PrivilegedAction<ContainerManager>() {
|
||||
@Override
|
||||
public ContainerManager run() {
|
||||
YarnRPC rpc = YarnRPC.create(getConfig());
|
||||
return (ContainerManager) rpc.getProxy(ContainerManager.class,
|
||||
NetUtils.createSocketAddr(containerManagerBindAddr),
|
||||
getConfig());
|
||||
}
|
||||
});
|
||||
return proxy;
|
||||
}
|
||||
|
||||
private static class CommandTimer extends TimerTask {
|
||||
|
@ -213,7 +226,6 @@ public class ContainerLauncherImpl extends AbstractService implements
|
|||
+ ". Interrupting and returning";
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
LOG.warn(this.message);
|
||||
|
@ -255,8 +267,8 @@ public class ContainerLauncherImpl extends AbstractService implements
|
|||
timer.schedule(new CommandTimer(Thread.currentThread(), event),
|
||||
nmTimeOut);
|
||||
|
||||
ContainerManager proxy = getCMProxy(containerManagerBindAddr,
|
||||
containerToken);
|
||||
ContainerManager proxy = getCMProxy(containerID,
|
||||
containerManagerBindAddr, containerToken);
|
||||
|
||||
// Interruped during getProxy, but that didn't throw exception
|
||||
if (Thread.currentThread().isInterrupted()) {
|
||||
|
@ -331,8 +343,8 @@ public class ContainerLauncherImpl extends AbstractService implements
|
|||
timer.schedule(new CommandTimer(Thread.currentThread(), event),
|
||||
nmTimeOut);
|
||||
|
||||
ContainerManager proxy = getCMProxy(containerManagerBindAddr,
|
||||
containerToken);
|
||||
ContainerManager proxy = getCMProxy(containerID,
|
||||
containerManagerBindAddr, containerToken);
|
||||
|
||||
if (Thread.currentThread().isInterrupted()) {
|
||||
// The timer cancelled the command in the mean while. No need to
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
|||
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
|
||||
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
|
||||
import org.apache.hadoop.yarn.api.ContainerManager;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -109,7 +110,7 @@ public class TestContainerLauncher {
|
|||
protected ContainerLauncher createContainerLauncher(AppContext context) {
|
||||
return new ContainerLauncherImpl(context) {
|
||||
@Override
|
||||
protected ContainerManager getCMProxy(
|
||||
protected ContainerManager getCMProxy(ContainerId containerID,
|
||||
String containerManagerBindAddr, ContainerToken containerToken)
|
||||
throws IOException {
|
||||
try {
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
|
|||
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
|
||||
import org.apache.hadoop.yarn.api.ContainerManager;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -218,7 +219,7 @@ public class TestFail {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected ContainerManager getCMProxy(
|
||||
protected ContainerManager getCMProxy(ContainerId contianerID,
|
||||
String containerManagerBindAddr, ContainerToken containerToken)
|
||||
throws IOException {
|
||||
try {
|
||||
|
|
|
@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.api.AMRMProtocol;
|
|||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public interface Resource extends Comparable<Resource> {
|
||||
public abstract class Resource implements Comparable<Resource> {
|
||||
|
||||
/**
|
||||
* Get <em>memory</em> of the resource.
|
||||
|
@ -53,5 +53,31 @@ public interface Resource extends Comparable<Resource> {
|
|||
@Public
|
||||
@Stable
|
||||
public abstract void setMemory(int memory);
|
||||
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = 1;
|
||||
result = prime * result + getMemory();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj)
|
||||
return true;
|
||||
if (obj == null)
|
||||
return false;
|
||||
if (getClass() != obj.getClass())
|
||||
return false;
|
||||
Resource other = (Resource) obj;
|
||||
if (getMemory() != other.getMemory())
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "memory: " + getMemory();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,14 +19,11 @@
|
|||
package org.apache.hadoop.yarn.api.records.impl.pb;
|
||||
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ProtoBase;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProtoOrBuilder;
|
||||
|
||||
|
||||
|
||||
public class ResourcePBImpl extends ProtoBase<ResourceProto> implements Resource {
|
||||
public class ResourcePBImpl extends Resource {
|
||||
ResourceProto proto = ResourceProto.getDefaultInstance();
|
||||
ResourceProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
|
|
|
@ -33,23 +33,22 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
|
||||
public class ContainerTokenIdentifier extends TokenIdentifier {
|
||||
|
||||
private static Log LOG = LogFactory
|
||||
.getLog(ContainerTokenIdentifier.class);
|
||||
private static Log LOG = LogFactory.getLog(ContainerTokenIdentifier.class);
|
||||
|
||||
public static final Text KIND = new Text("ContainerToken");
|
||||
|
||||
private ContainerId containerId;
|
||||
private String nmHostName;
|
||||
private String nmHostAddr;
|
||||
private Resource resource;
|
||||
|
||||
public ContainerTokenIdentifier(ContainerId containerID, String hostName,
|
||||
Resource r) {
|
||||
this.containerId = containerID;
|
||||
this.nmHostName = hostName;
|
||||
this.nmHostAddr = hostName;
|
||||
this.resource = r;
|
||||
}
|
||||
|
||||
|
@ -57,59 +56,46 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
|
|||
}
|
||||
|
||||
public ContainerId getContainerID() {
|
||||
return containerId;
|
||||
return this.containerId;
|
||||
}
|
||||
|
||||
public String getNmHostName() {
|
||||
return nmHostName;
|
||||
public String getNmHostAddress() {
|
||||
return this.nmHostAddr;
|
||||
}
|
||||
|
||||
public Resource getResource() {
|
||||
return resource;
|
||||
return this.resource;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
LOG.debug("Writing ContainerTokenIdentifier to RPC layer");
|
||||
ApplicationAttemptId applicationAttemptId =
|
||||
containerId.getApplicationAttemptId();
|
||||
LOG.debug("Writing ContainerTokenIdentifier to RPC layer: " + this);
|
||||
ApplicationAttemptId applicationAttemptId = this.containerId
|
||||
.getApplicationAttemptId();
|
||||
ApplicationId applicationId = applicationAttemptId.getApplicationId();
|
||||
out.writeLong(applicationId.getClusterTimestamp());
|
||||
out.writeInt(applicationId.getId());
|
||||
out.writeInt(applicationAttemptId.getAttemptId());
|
||||
out.writeInt(this.containerId.getId());
|
||||
out.writeUTF(this.nmHostName);
|
||||
out.writeUTF(this.nmHostAddr);
|
||||
out.writeInt(this.resource.getMemory());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
this.containerId =
|
||||
RecordFactoryProvider.getRecordFactory(null).newRecordInstance(
|
||||
ContainerId.class);
|
||||
ApplicationAttemptId applicationAttemptId =
|
||||
RecordFactoryProvider.getRecordFactory(null).newRecordInstance(
|
||||
ApplicationAttemptId.class);
|
||||
ApplicationId applicationId =
|
||||
RecordFactoryProvider.getRecordFactory(null).newRecordInstance(
|
||||
ApplicationId.class);
|
||||
applicationId.setClusterTimestamp(in.readLong());
|
||||
applicationId.setId(in.readInt());
|
||||
applicationAttemptId.setApplicationId(applicationId);
|
||||
applicationAttemptId.setAttemptId(in.readInt());
|
||||
this.containerId.setApplicationAttemptId(applicationAttemptId);
|
||||
this.containerId.setId(in.readInt());
|
||||
this.nmHostName = in.readUTF();
|
||||
this.resource =
|
||||
RecordFactoryProvider.getRecordFactory(null).newRecordInstance(
|
||||
Resource.class);
|
||||
this.resource.setMemory(in.readInt());
|
||||
ApplicationId applicationId = BuilderUtils.newApplicationId(
|
||||
in.readLong(), in.readInt());
|
||||
ApplicationAttemptId applicationAttemptId = BuilderUtils
|
||||
.newApplicationAttemptId(applicationId, in.readInt());
|
||||
this.containerId = BuilderUtils.newContainerId(applicationAttemptId, in
|
||||
.readInt());
|
||||
this.nmHostAddr = in.readUTF();
|
||||
this.resource = BuilderUtils.newResource(in.readInt());
|
||||
}
|
||||
|
||||
@SuppressWarnings("static-access")
|
||||
@Override
|
||||
public Text getKind() {
|
||||
return this.KIND;
|
||||
return KIND;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -117,7 +103,6 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
|
|||
return UserGroupInformation.createRemoteUser(this.containerId.toString());
|
||||
}
|
||||
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public static class Renewer extends Token.TrivialRenewer {
|
||||
@Override
|
||||
|
|
|
@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.security;
|
|||
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
|
@ -28,6 +30,10 @@ import org.apache.hadoop.security.token.TokenSelector;
|
|||
public class ContainerTokenSelector implements
|
||||
TokenSelector<ContainerTokenIdentifier> {
|
||||
|
||||
private static final Log LOG = LogFactory
|
||||
.getLog(ContainerTokenSelector.class);
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public Token<ContainerTokenIdentifier> selectToken(Text service,
|
||||
Collection<Token<? extends TokenIdentifier>> tokens) {
|
||||
|
@ -35,6 +41,10 @@ public class ContainerTokenSelector implements
|
|||
return null;
|
||||
}
|
||||
for (Token<? extends TokenIdentifier> token : tokens) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.info("Looking for service: " + service + ". Current token is "
|
||||
+ token);
|
||||
}
|
||||
if (ContainerTokenIdentifier.KIND.equals(token.getKind()) &&
|
||||
service.equals(token.getService())) {
|
||||
return (Token<ContainerTokenIdentifier>) token;
|
||||
|
|
|
@ -27,10 +27,11 @@ import java.util.Map;
|
|||
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
|
@ -45,7 +46,6 @@ import org.apache.hadoop.yarn.api.records.NodeId;
|
|||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.URL;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
|
@ -256,6 +256,12 @@ public class BuilderUtils {
|
|||
return container;
|
||||
}
|
||||
|
||||
public static Priority newPriority(int p) {
|
||||
Priority priority = recordFactory.newRecordInstance(Priority.class);
|
||||
priority.setPriority(p);
|
||||
return priority;
|
||||
}
|
||||
|
||||
public static ResourceRequest newResourceRequest(Priority priority,
|
||||
String hostName, Resource capability, int numContainers) {
|
||||
ResourceRequest request = recordFactory
|
||||
|
|
|
@ -56,19 +56,19 @@ public class ContainerTokenSecretManager extends
|
|||
@Override
|
||||
public byte[] createPassword(ContainerTokenIdentifier identifier) {
|
||||
LOG.debug("Creating password for " + identifier.getContainerID()
|
||||
+ " to be run on NM " + identifier.getNmHostName() + " "
|
||||
+ this.secretkeys.get(identifier.getNmHostName()));
|
||||
+ " to be run on NM " + identifier.getNmHostAddress() + " "
|
||||
+ this.secretkeys.get(identifier.getNmHostAddress()));
|
||||
return createPassword(identifier.getBytes(),
|
||||
this.secretkeys.get(identifier.getNmHostName()));
|
||||
this.secretkeys.get(identifier.getNmHostAddress()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] retrievePassword(ContainerTokenIdentifier identifier)
|
||||
throws org.apache.hadoop.security.token.SecretManager.InvalidToken {
|
||||
LOG.debug("Retrieving password for " + identifier.getContainerID()
|
||||
+ " to be run on NM " + identifier.getNmHostName());
|
||||
+ " to be run on NM " + identifier.getNmHostAddress());
|
||||
return createPassword(identifier.getBytes(),
|
||||
this.secretkeys.get(identifier.getNmHostName()));
|
||||
this.secretkeys.get(identifier.getNmHostAddress()));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -27,18 +27,19 @@ import java.net.UnknownHostException;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.io.DataInputByteBuffer;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.YarnException;
|
||||
import org.apache.hadoop.yarn.api.ContainerManager;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
|
||||
|
@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
|
@ -59,6 +61,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
|
|||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||
|
@ -105,6 +108,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
final Context context;
|
||||
private final ContainersMonitor containersMonitor;
|
||||
private Server server;
|
||||
private InetAddress resolvedAddress = null;
|
||||
private final ResourceLocalizationService rsrcLocalizationSrvc;
|
||||
private final ContainersLauncher containersLauncher;
|
||||
private final AuxServices auxiliaryServices;
|
||||
|
@ -213,13 +217,12 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
}
|
||||
|
||||
server.start();
|
||||
InetAddress hostNameResolved = null;
|
||||
try {
|
||||
hostNameResolved = InetAddress.getLocalHost();
|
||||
resolvedAddress = InetAddress.getLocalHost();
|
||||
} catch (UnknownHostException e) {
|
||||
throw new YarnException(e);
|
||||
}
|
||||
this.context.getNodeId().setHost(hostNameResolved.getCanonicalHostName());
|
||||
this.context.getNodeId().setHost(resolvedAddress.getCanonicalHostName());
|
||||
this.context.getNodeId().setPort(server.getPort());
|
||||
LOG.info("ContainerManager started at "
|
||||
+ this.context.getNodeId().toString());
|
||||
|
@ -242,6 +245,79 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
super.stop();
|
||||
}
|
||||
|
||||
/**
|
||||
* Authorize the request.
|
||||
*
|
||||
* @param containerID
|
||||
* of the container
|
||||
* @param launchContext
|
||||
* passed if verifying the startContainer, null otherwise.
|
||||
* @throws YarnRemoteException
|
||||
*/
|
||||
private void authorizeRequest(ContainerId containerID,
|
||||
ContainerLaunchContext launchContext) throws YarnRemoteException {
|
||||
|
||||
if (!UserGroupInformation.isSecurityEnabled()) {
|
||||
return;
|
||||
}
|
||||
|
||||
String containerIDStr = containerID.toString();
|
||||
|
||||
UserGroupInformation remoteUgi;
|
||||
try {
|
||||
remoteUgi = UserGroupInformation.getCurrentUser();
|
||||
} catch (IOException e) {
|
||||
String msg = "Cannot obtain the user-name for containerId: "
|
||||
+ containerIDStr + ". Got exception: "
|
||||
+ StringUtils.stringifyException(e);
|
||||
LOG.warn(msg);
|
||||
throw RPCUtil.getRemoteException(msg);
|
||||
}
|
||||
|
||||
boolean unauthorized = false;
|
||||
StringBuilder messageBuilder = new StringBuilder(
|
||||
"Unauthorized request to start container. ");
|
||||
|
||||
if (!remoteUgi.getUserName().equals(containerIDStr)) {
|
||||
unauthorized = true;
|
||||
messageBuilder.append("\nExpected containerId: "
|
||||
+ remoteUgi.getUserName() + " Found: " + containerIDStr);
|
||||
}
|
||||
|
||||
if (launchContext != null) {
|
||||
|
||||
// Verify other things for startContainer() request.
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Number of TokenIdentifiers in the UGI from RPC: "
|
||||
+ remoteUgi.getTokenIdentifiers().size());
|
||||
}
|
||||
// We must and should get only one TokenIdentifier from the RPC.
|
||||
ContainerTokenIdentifier tokenId = (ContainerTokenIdentifier) remoteUgi
|
||||
.getTokenIdentifiers().iterator().next();
|
||||
if (tokenId == null) {
|
||||
unauthorized = true;
|
||||
messageBuilder
|
||||
.append("\nContainerTokenIdentifier cannot be null! Null found for "
|
||||
+ containerIDStr);
|
||||
} else {
|
||||
|
||||
Resource resource = tokenId.getResource();
|
||||
if (!resource.equals(launchContext.getResource())) {
|
||||
unauthorized = true;
|
||||
messageBuilder.append("\nExpected resource " + resource
|
||||
+ " but found " + launchContext.getResource());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (unauthorized) {
|
||||
String msg = messageBuilder.toString();
|
||||
LOG.error(msg);
|
||||
throw RPCUtil.getRemoteException(msg);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start a container on this NodeManager.
|
||||
*/
|
||||
|
@ -251,8 +327,11 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
throws YarnRemoteException {
|
||||
ContainerLaunchContext launchContext = request.getContainerLaunchContext();
|
||||
|
||||
ContainerId containerID = launchContext.getContainerId();
|
||||
authorizeRequest(containerID, launchContext);
|
||||
|
||||
LOG.info(" container is " + request);
|
||||
|
||||
|
||||
// //////////// Parse credentials
|
||||
ByteBuffer tokens = launchContext.getContainerTokens();
|
||||
Credentials credentials = new Credentials();
|
||||
|
@ -274,9 +353,8 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
}
|
||||
// //////////// End of parsing credentials
|
||||
|
||||
Container container =
|
||||
new ContainerImpl(getConfig(), this.dispatcher, launchContext, credentials, metrics);
|
||||
ContainerId containerID = launchContext.getContainerId();
|
||||
Container container = new ContainerImpl(getConfig(), this.dispatcher,
|
||||
launchContext, credentials, metrics);
|
||||
ApplicationId applicationID =
|
||||
containerID.getApplicationAttemptId().getApplicationId();
|
||||
if (context.getContainers().putIfAbsent(containerID, container) != null) {
|
||||
|
@ -319,39 +397,36 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
return response;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the container running on this NodeManager.
|
||||
*/
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public StopContainerResponse stopContainer(StopContainerRequest request)
|
||||
throws YarnRemoteException {
|
||||
|
||||
ContainerId containerID = request.getContainerId();
|
||||
// TODO: Only the container's owner can kill containers today.
|
||||
authorizeRequest(containerID, null);
|
||||
|
||||
StopContainerResponse response =
|
||||
recordFactory.newRecordInstance(StopContainerResponse.class);
|
||||
|
||||
ContainerId containerID = request.getContainerId();
|
||||
Container container = this.context.getContainers().get(containerID);
|
||||
if (container == null) {
|
||||
LOG.warn("Trying to stop unknown container " + containerID);
|
||||
String userName;
|
||||
try {
|
||||
userName = UserGroupInformation.getCurrentUser().getUserName();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error finding userName", e);
|
||||
return response;
|
||||
}
|
||||
NMAuditLogger.logFailure(userName,
|
||||
NMAuditLogger.logFailure("UnknownUser",
|
||||
AuditConstants.STOP_CONTAINER, "ContainerManagerImpl",
|
||||
"Trying to stop unknown container!",
|
||||
containerID.getApplicationAttemptId().getApplicationId(),
|
||||
containerID);
|
||||
return response; // Return immediately.
|
||||
}
|
||||
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ContainerKillEvent(containerID,
|
||||
"Container killed by the ApplicationMaster."));
|
||||
|
||||
// user logged here not ideal since just getting user from container but
|
||||
// request doesn't have anything and should be coming from user of AM so
|
||||
// should be the same or should be rejected by auth before here.
|
||||
|
||||
NMAuditLogger.logSuccess(container.getUser(),
|
||||
AuditConstants.STOP_CONTAINER, "ContainerManageImpl",
|
||||
containerID.getApplicationAttemptId().getApplicationId(),
|
||||
|
@ -365,20 +440,26 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
}
|
||||
|
||||
@Override
|
||||
public GetContainerStatusResponse getContainerStatus(GetContainerStatusRequest request) throws YarnRemoteException {
|
||||
public GetContainerStatusResponse getContainerStatus(
|
||||
GetContainerStatusRequest request) throws YarnRemoteException {
|
||||
|
||||
ContainerId containerID = request.getContainerId();
|
||||
// TODO: Only the container's owner can get containers' status today.
|
||||
authorizeRequest(containerID, null);
|
||||
|
||||
LOG.info("Getting container-status for " + containerID);
|
||||
Container container = this.context.getContainers().get(containerID);
|
||||
if (container != null) {
|
||||
ContainerStatus containerStatus = container.cloneAndGetContainerStatus();
|
||||
LOG.info("Returning " + containerStatus);
|
||||
GetContainerStatusResponse response = recordFactory.newRecordInstance(GetContainerStatusResponse.class);
|
||||
GetContainerStatusResponse response = recordFactory
|
||||
.newRecordInstance(GetContainerStatusResponse.class);
|
||||
response.setStatus(containerStatus);
|
||||
return response;
|
||||
} else {
|
||||
throw RPCUtil.getRemoteException("Container " + containerID
|
||||
+ " is not handled by this NodeManager");
|
||||
}
|
||||
|
||||
throw RPCUtil.getRemoteException("Container " + containerID
|
||||
+ " is not handled by this NodeManager");
|
||||
}
|
||||
|
||||
class ContainerEventDispatcher implements EventHandler<ContainerEvent> {
|
||||
|
@ -412,8 +493,8 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void handle(ContainerManagerEvent event) {
|
||||
switch (event.getType()) {
|
||||
case FINISH_APPS:
|
||||
|
|
|
@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sec
|
|||
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
|
@ -28,23 +30,23 @@ import org.apache.hadoop.security.token.TokenSelector;
|
|||
public class LocalizerTokenSelector implements
|
||||
TokenSelector<LocalizerTokenIdentifier> {
|
||||
|
||||
private static final Log LOG = LogFactory
|
||||
.getLog(LocalizerTokenSelector.class);
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public Token<LocalizerTokenIdentifier> selectToken(Text service,
|
||||
Collection<Token<? extends TokenIdentifier>> tokens) {
|
||||
System.err.print("=========== Using localizerTokenSelector");
|
||||
// if (service == null) {
|
||||
// return null;
|
||||
// }
|
||||
|
||||
LOG.debug("Using localizerTokenSelector.");
|
||||
|
||||
for (Token<? extends TokenIdentifier> token : tokens) {
|
||||
System.err.print("============ token of kind " + token.getKind() + " is found");
|
||||
if (LocalizerTokenIdentifier.KIND.equals(token.getKind())
|
||||
//&& service.equals(token.getService())
|
||||
) {
|
||||
LOG.debug("Token of kind " + token.getKind() + " is found");
|
||||
if (LocalizerTokenIdentifier.KIND.equals(token.getKind())) {
|
||||
return (Token<LocalizerTokenIdentifier>) token;
|
||||
}
|
||||
}
|
||||
System.err.print("returning null ========== ");
|
||||
LOG.debug("Returning null.");
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -42,7 +42,6 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
|
|||
import org.apache.hadoop.yarn.api.ContainerManager;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
|
@ -101,9 +100,7 @@ public class AMLauncher implements Runnable {
|
|||
private void connect() throws IOException {
|
||||
ContainerId masterContainerID = application.getMasterContainer().getId();
|
||||
|
||||
containerMgrProxy =
|
||||
getContainerMgrProxy(
|
||||
masterContainerID.getApplicationAttemptId().getApplicationId());
|
||||
containerMgrProxy = getContainerMgrProxy(masterContainerID);
|
||||
}
|
||||
|
||||
private void launch() throws IOException {
|
||||
|
@ -133,7 +130,7 @@ public class AMLauncher implements Runnable {
|
|||
}
|
||||
|
||||
protected ContainerManager getContainerMgrProxy(
|
||||
final ApplicationId applicationID) throws IOException {
|
||||
final ContainerId containerId) {
|
||||
|
||||
Container container = application.getMasterContainer();
|
||||
|
||||
|
@ -141,8 +138,8 @@ public class AMLauncher implements Runnable {
|
|||
|
||||
final YarnRPC rpc = YarnRPC.create(conf); // TODO: Don't create again and again.
|
||||
|
||||
UserGroupInformation currentUser =
|
||||
UserGroupInformation.createRemoteUser("yarn"); // TODO
|
||||
UserGroupInformation currentUser = UserGroupInformation
|
||||
.createRemoteUser(containerId.toString());
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
ContainerToken containerToken = container.getContainerToken();
|
||||
Token<ContainerTokenIdentifier> token =
|
||||
|
|
|
@ -133,7 +133,7 @@ public class TestApplicationMasterLauncher {
|
|||
getConfig()) {
|
||||
@Override
|
||||
protected ContainerManager getContainerMgrProxy(
|
||||
ApplicationId applicationID) throws IOException {
|
||||
ContainerId containerId) {
|
||||
return containerManager;
|
||||
}
|
||||
};
|
||||
|
|
|
@ -0,0 +1,501 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server;
|
||||
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||
import org.apache.hadoop.io.DataInputBuffer;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.yarn.api.AMRMProtocol;
|
||||
import org.apache.hadoop.yarn.api.ContainerManager;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.URL;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestContainerManagerSecurity {
|
||||
|
||||
static Log LOG = LogFactory.getLog(TestContainerManagerSecurity.class);
|
||||
static final RecordFactory recordFactory = RecordFactoryProvider
|
||||
.getRecordFactory(null);
|
||||
private static FileContext localFS = null;
|
||||
private static final File localDir = new File("target",
|
||||
TestContainerManagerSecurity.class.getName() + "-localDir")
|
||||
.getAbsoluteFile();
|
||||
private static MiniYARNCluster yarnCluster;
|
||||
|
||||
static final Configuration conf = new Configuration();
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws AccessControlException,
|
||||
FileNotFoundException, UnsupportedFileSystemException, IOException {
|
||||
localFS = FileContext.getLocalFSFileContext();
|
||||
localFS.delete(new Path(localDir.getAbsolutePath()), true);
|
||||
localDir.mkdir();
|
||||
|
||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||
"kerberos");
|
||||
// Set AM expiry interval to be very long.
|
||||
conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 100000L);
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
yarnCluster = new MiniYARNCluster(TestContainerManagerSecurity.class
|
||||
.getName());
|
||||
yarnCluster.init(conf);
|
||||
yarnCluster.start();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardown() {
|
||||
yarnCluster.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAuthenticatedUser() throws IOException,
|
||||
InterruptedException {
|
||||
|
||||
LOG.info("Running test for authenticated user");
|
||||
|
||||
ResourceManager resourceManager = yarnCluster.getResourceManager();
|
||||
|
||||
final YarnRPC yarnRPC = YarnRPC.create(conf);
|
||||
|
||||
// Submit an application
|
||||
ApplicationId appID = resourceManager.getClientRMService()
|
||||
.getNewApplication(Records.newRecord(GetNewApplicationRequest.class))
|
||||
.getApplicationId();
|
||||
AMRMProtocol scheduler = submitAndRegisterApplication(resourceManager,
|
||||
yarnRPC, appID);
|
||||
|
||||
// Now request a container.
|
||||
final Container allocatedContainer = requestAndGetContainer(scheduler,
|
||||
appID);
|
||||
|
||||
// Now talk to the NM for launching the container.
|
||||
final ContainerId containerID = allocatedContainer.getId();
|
||||
UserGroupInformation authenticatedUser = UserGroupInformation
|
||||
.createRemoteUser(containerID.toString());
|
||||
ContainerToken containerToken = allocatedContainer.getContainerToken();
|
||||
Token<ContainerTokenIdentifier> token = new Token<ContainerTokenIdentifier>(
|
||||
containerToken.getIdentifier().array(), containerToken.getPassword()
|
||||
.array(), new Text(containerToken.getKind()), new Text(
|
||||
containerToken.getService()));
|
||||
authenticatedUser.addToken(token);
|
||||
authenticatedUser.doAs(new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
ContainerManager client = (ContainerManager) yarnRPC.getProxy(
|
||||
ContainerManager.class, NetUtils
|
||||
.createSocketAddr(allocatedContainer.getNodeId().toString()),
|
||||
conf);
|
||||
LOG.info("Going to make a legal stopContainer() request");
|
||||
StopContainerRequest request = recordFactory
|
||||
.newRecordInstance(StopContainerRequest.class);
|
||||
request.setContainerId(containerID);
|
||||
client.stopContainer(request);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
KillApplicationRequest request = Records
|
||||
.newRecord(KillApplicationRequest.class);
|
||||
request.setApplicationId(appID);
|
||||
resourceManager.getClientRMService().forceKillApplication(request);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaliceUser() throws IOException, InterruptedException {
|
||||
|
||||
LOG.info("Running test for malice user");
|
||||
|
||||
ResourceManager resourceManager = yarnCluster.getResourceManager();
|
||||
|
||||
final YarnRPC yarnRPC = YarnRPC.create(conf);
|
||||
|
||||
// Submit an application
|
||||
ApplicationId appID = resourceManager.getClientRMService()
|
||||
.getNewApplication(Records.newRecord(GetNewApplicationRequest.class))
|
||||
.getApplicationId();
|
||||
AMRMProtocol scheduler = submitAndRegisterApplication(resourceManager,
|
||||
yarnRPC, appID);
|
||||
|
||||
// Now request a container.
|
||||
final Container allocatedContainer = requestAndGetContainer(scheduler,
|
||||
appID);
|
||||
|
||||
// Now talk to the NM for launching the container with modified resource
|
||||
final ContainerId containerID = allocatedContainer.getId();
|
||||
UserGroupInformation maliceUser = UserGroupInformation
|
||||
.createRemoteUser(containerID.toString());
|
||||
|
||||
ContainerToken containerToken = allocatedContainer.getContainerToken();
|
||||
byte[] identifierBytes = containerToken.getIdentifier().array();
|
||||
|
||||
DataInputBuffer di = new DataInputBuffer();
|
||||
di.reset(identifierBytes, identifierBytes.length);
|
||||
|
||||
ContainerTokenIdentifier dummyIdentifier = new ContainerTokenIdentifier();
|
||||
dummyIdentifier.readFields(di);
|
||||
// Malice user modifies the resource amount
|
||||
Resource modifiedResource = BuilderUtils.newResource(2048);
|
||||
ContainerTokenIdentifier modifiedIdentifier = new ContainerTokenIdentifier(
|
||||
dummyIdentifier.getContainerID(), dummyIdentifier.getNmHostAddress(),
|
||||
modifiedResource);
|
||||
Token<ContainerTokenIdentifier> modifiedToken = new Token<ContainerTokenIdentifier>(
|
||||
modifiedIdentifier.getBytes(), containerToken.getPassword().array(),
|
||||
new Text(containerToken.getKind()), new Text(containerToken
|
||||
.getService()));
|
||||
maliceUser.addToken(modifiedToken);
|
||||
maliceUser.doAs(new PrivilegedAction<Void>() {
|
||||
@Override
|
||||
public Void run() {
|
||||
ContainerManager client = (ContainerManager) yarnRPC.getProxy(
|
||||
ContainerManager.class, NetUtils
|
||||
.createSocketAddr(allocatedContainer.getNodeId().toString()),
|
||||
conf);
|
||||
|
||||
LOG.info("Going to contact NM: ilLegal request");
|
||||
GetContainerStatusRequest request = recordFactory
|
||||
.newRecordInstance(GetContainerStatusRequest.class);
|
||||
request.setContainerId(containerID);
|
||||
try {
|
||||
client.getContainerStatus(request);
|
||||
fail("Connection initiation with illegally modified "
|
||||
+ "tokens is expected to fail.");
|
||||
} catch (YarnRemoteException e) {
|
||||
LOG.error("Got exception", e);
|
||||
fail("Cannot get a YARN remote exception as "
|
||||
+ "it will indicate RPC success");
|
||||
} catch (Exception e) {
|
||||
Assert.assertEquals(
|
||||
java.lang.reflect.UndeclaredThrowableException.class
|
||||
.getCanonicalName(), e.getClass().getCanonicalName());
|
||||
Assert.assertEquals(
|
||||
"DIGEST-MD5: digest response format violation. "
|
||||
+ "Mismatched response.", e.getCause().getCause()
|
||||
.getMessage());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
KillApplicationRequest request = Records
|
||||
.newRecord(KillApplicationRequest.class);
|
||||
request.setApplicationId(appID);
|
||||
resourceManager.getClientRMService().forceKillApplication(request);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnauthorizedUser() throws IOException, InterruptedException {
|
||||
|
||||
LOG.info("\n\nRunning test for malice user");
|
||||
|
||||
ResourceManager resourceManager = yarnCluster.getResourceManager();
|
||||
|
||||
final YarnRPC yarnRPC = YarnRPC.create(conf);
|
||||
|
||||
// Submit an application
|
||||
final ApplicationId appID = resourceManager.getClientRMService()
|
||||
.getNewApplication(Records.newRecord(GetNewApplicationRequest.class))
|
||||
.getApplicationId();
|
||||
AMRMProtocol scheduler = submitAndRegisterApplication(resourceManager,
|
||||
yarnRPC, appID);
|
||||
|
||||
// Now request a container.
|
||||
final Container allocatedContainer = requestAndGetContainer(scheduler,
|
||||
appID);
|
||||
|
||||
// Now talk to the NM for launching the container with modified containerID
|
||||
final ContainerId containerID = allocatedContainer.getId();
|
||||
|
||||
UserGroupInformation unauthorizedUser = UserGroupInformation
|
||||
.createRemoteUser(containerID.toString());
|
||||
ContainerToken containerToken = allocatedContainer.getContainerToken();
|
||||
|
||||
byte[] identifierBytes = containerToken.getIdentifier().array();
|
||||
DataInputBuffer di = new DataInputBuffer();
|
||||
di.reset(identifierBytes, identifierBytes.length);
|
||||
final ContainerTokenIdentifier tokenId = new ContainerTokenIdentifier();
|
||||
tokenId.readFields(di);
|
||||
|
||||
Token<ContainerTokenIdentifier> token = new Token<ContainerTokenIdentifier>(
|
||||
identifierBytes, containerToken.getPassword().array(), new Text(
|
||||
containerToken.getKind()), new Text(containerToken.getService()));
|
||||
|
||||
unauthorizedUser.addToken(token);
|
||||
unauthorizedUser.doAs(new PrivilegedAction<Void>() {
|
||||
@Override
|
||||
public Void run() {
|
||||
ContainerManager client = (ContainerManager) yarnRPC.getProxy(
|
||||
ContainerManager.class, NetUtils
|
||||
.createSocketAddr(allocatedContainer.getNodeId().toString()),
|
||||
conf);
|
||||
|
||||
LOG.info("Going to contact NM: unauthorized request");
|
||||
|
||||
callWithIllegalContainerID(client, tokenId);
|
||||
callWithIllegalResource(client, tokenId);
|
||||
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
KillApplicationRequest request = Records
|
||||
.newRecord(KillApplicationRequest.class);
|
||||
request.setApplicationId(appID);
|
||||
resourceManager.getClientRMService().forceKillApplication(request);
|
||||
}
|
||||
|
||||
private AMRMProtocol submitAndRegisterApplication(
|
||||
ResourceManager resourceManager, final YarnRPC yarnRPC,
|
||||
ApplicationId appID) throws IOException,
|
||||
UnsupportedFileSystemException, YarnRemoteException,
|
||||
InterruptedException {
|
||||
|
||||
// TODO: Use a resource to work around bugs. Today NM doesn't create local
|
||||
// app-dirs if there are no file to download!!
|
||||
String fileName = "testFile-" + appID.toString();
|
||||
File testFile = new File(localDir.getAbsolutePath(), fileName);
|
||||
FileWriter tmpFile = new FileWriter(testFile);
|
||||
tmpFile.write("testing");
|
||||
tmpFile.close();
|
||||
URL testFileURL = ConverterUtils.getYarnUrlFromPath(FileContext
|
||||
.getFileContext().makeQualified(
|
||||
new Path(localDir.getAbsolutePath(), fileName)));
|
||||
LocalResource rsrc = BuilderUtils.newLocalResource(testFileURL,
|
||||
LocalResourceType.FILE, LocalResourceVisibility.PRIVATE, testFile
|
||||
.length(), testFile.lastModified());
|
||||
|
||||
ContainerLaunchContext amContainer = BuilderUtils
|
||||
.newContainerLaunchContext(null, "testUser", BuilderUtils
|
||||
.newResource(1024), Collections.singletonMap(fileName, rsrc),
|
||||
new HashMap<String, String>(), Arrays.asList("sleep", "100"),
|
||||
new HashMap<String, ByteBuffer>(), null,
|
||||
new HashMap<ApplicationAccessType, String>());
|
||||
|
||||
ApplicationSubmissionContext appSubmissionContext = recordFactory
|
||||
.newRecordInstance(ApplicationSubmissionContext.class);
|
||||
appSubmissionContext.setApplicationId(appID);
|
||||
appSubmissionContext.setUser("testUser");
|
||||
appSubmissionContext.setAMContainerSpec(amContainer);
|
||||
|
||||
SubmitApplicationRequest submitRequest = recordFactory
|
||||
.newRecordInstance(SubmitApplicationRequest.class);
|
||||
submitRequest.setApplicationSubmissionContext(appSubmissionContext);
|
||||
resourceManager.getClientRMService().submitApplication(submitRequest);
|
||||
|
||||
// Wait till container gets allocated for AM
|
||||
int waitCounter = 0;
|
||||
RMApp app = resourceManager.getRMContext().getRMApps().get(appID);
|
||||
RMAppAttempt appAttempt = app == null ? null : app.getCurrentAppAttempt();
|
||||
RMAppAttemptState state = appAttempt == null ? null : appAttempt
|
||||
.getAppAttemptState();
|
||||
while ((app == null || appAttempt == null || state == null || !state
|
||||
.equals(RMAppAttemptState.LAUNCHED))
|
||||
&& waitCounter++ != 20) {
|
||||
LOG.info("Waiting for applicationAttempt to be created.. ");
|
||||
Thread.sleep(1000);
|
||||
app = resourceManager.getRMContext().getRMApps().get(appID);
|
||||
appAttempt = app == null ? null : app.getCurrentAppAttempt();
|
||||
state = appAttempt == null ? null : appAttempt.getAppAttemptState();
|
||||
}
|
||||
Assert.assertNotNull(app);
|
||||
Assert.assertNotNull(appAttempt);
|
||||
Assert.assertNotNull(state);
|
||||
Assert.assertEquals(RMAppAttemptState.LAUNCHED, state);
|
||||
|
||||
UserGroupInformation currentUser = UserGroupInformation.createRemoteUser(
|
||||
appAttempt.getAppAttemptId().toString());
|
||||
|
||||
// Ask for a container from the RM
|
||||
String schedulerAddressString = conf.get(
|
||||
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS);
|
||||
final InetSocketAddress schedulerAddr = NetUtils
|
||||
.createSocketAddr(schedulerAddressString);
|
||||
ApplicationTokenIdentifier appTokenIdentifier = new ApplicationTokenIdentifier(
|
||||
appAttempt.getAppAttemptId());
|
||||
ApplicationTokenSecretManager appTokenSecretManager = new ApplicationTokenSecretManager();
|
||||
appTokenSecretManager.setMasterKey(ApplicationTokenSecretManager
|
||||
.createSecretKey("Dummy".getBytes())); // TODO: FIX. Be in Sync with
|
||||
// ResourceManager.java
|
||||
Token<ApplicationTokenIdentifier> appToken = new Token<ApplicationTokenIdentifier>(
|
||||
appTokenIdentifier, appTokenSecretManager);
|
||||
appToken.setService(new Text(schedulerAddressString));
|
||||
currentUser.addToken(appToken);
|
||||
|
||||
AMRMProtocol scheduler = currentUser
|
||||
.doAs(new PrivilegedAction<AMRMProtocol>() {
|
||||
@Override
|
||||
public AMRMProtocol run() {
|
||||
return (AMRMProtocol) yarnRPC.getProxy(AMRMProtocol.class,
|
||||
schedulerAddr, conf);
|
||||
}
|
||||
});
|
||||
|
||||
// Register the appMaster
|
||||
RegisterApplicationMasterRequest request = recordFactory
|
||||
.newRecordInstance(RegisterApplicationMasterRequest.class);
|
||||
request.setApplicationAttemptId(resourceManager.getRMContext()
|
||||
.getRMApps().get(appID).getCurrentAppAttempt().getAppAttemptId());
|
||||
scheduler.registerApplicationMaster(request);
|
||||
return scheduler;
|
||||
}
|
||||
|
||||
private Container requestAndGetContainer(AMRMProtocol scheduler,
|
||||
ApplicationId appID) throws YarnRemoteException, InterruptedException {
|
||||
|
||||
// Request a container allocation.
|
||||
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
||||
ask.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), "*",
|
||||
BuilderUtils.newResource(1024), 1));
|
||||
|
||||
AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
|
||||
BuilderUtils.newApplicationAttemptId(appID, 1), 0, 0F, ask,
|
||||
new ArrayList<ContainerId>());
|
||||
List<Container> allocatedContainers = scheduler.allocate(allocateRequest)
|
||||
.getAMResponse().getAllocatedContainers();
|
||||
|
||||
// Modify ask to request no more.
|
||||
allocateRequest.clearAsks();
|
||||
|
||||
int waitCounter = 0;
|
||||
while ((allocatedContainers == null || allocatedContainers.size() == 0)
|
||||
&& waitCounter++ != 20) {
|
||||
LOG.info("Waiting for container to be allocated..");
|
||||
Thread.sleep(1000);
|
||||
allocateRequest.setResponseId(allocateRequest.getResponseId() + 1);
|
||||
allocatedContainers = scheduler.allocate(allocateRequest)
|
||||
.getAMResponse().getAllocatedContainers();
|
||||
}
|
||||
|
||||
Assert.assertNotNull("Container is not allocted!", allocatedContainers);
|
||||
Assert.assertEquals("Didn't get one container!", 1, allocatedContainers
|
||||
.size());
|
||||
|
||||
return allocatedContainers.get(0);
|
||||
}
|
||||
|
||||
void callWithIllegalContainerID(ContainerManager client,
|
||||
ContainerTokenIdentifier tokenId) {
|
||||
GetContainerStatusRequest request = recordFactory
|
||||
.newRecordInstance(GetContainerStatusRequest.class);
|
||||
ContainerId newContainerId = BuilderUtils.newContainerId(BuilderUtils
|
||||
.newApplicationAttemptId(tokenId.getContainerID()
|
||||
.getApplicationAttemptId().getApplicationId(), 1), 42);
|
||||
request.setContainerId(newContainerId); // Authenticated but
|
||||
// unauthorized.
|
||||
try {
|
||||
client.getContainerStatus(request);
|
||||
fail("Connection initiation with unauthorized "
|
||||
+ "access is expected to fail.");
|
||||
} catch (YarnRemoteException e) {
|
||||
LOG.info("Got exception : ", e);
|
||||
Assert.assertEquals("Unauthorized request to start container. "
|
||||
+ "\nExpected containerId: " + tokenId.getContainerID()
|
||||
+ " Found: " + newContainerId.toString(), e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
void callWithIllegalResource(ContainerManager client,
|
||||
ContainerTokenIdentifier tokenId) {
|
||||
StartContainerRequest request = recordFactory
|
||||
.newRecordInstance(StartContainerRequest.class);
|
||||
// Authenticated but unauthorized, due to wrong resource
|
||||
ContainerLaunchContext context = BuilderUtils.newContainerLaunchContext(
|
||||
tokenId.getContainerID(), "testUser", BuilderUtils.newResource(2048),
|
||||
new HashMap<String, LocalResource>(), new HashMap<String, String>(),
|
||||
new ArrayList<String>(), new HashMap<String, ByteBuffer>(), null,
|
||||
new HashMap<ApplicationAccessType, String>());
|
||||
request.setContainerLaunchContext(context);
|
||||
try {
|
||||
client.startContainer(request);
|
||||
fail("Connection initiation with unauthorized "
|
||||
+ "access is expected to fail.");
|
||||
} catch (YarnRemoteException e) {
|
||||
LOG.info("Got exception : ", e);
|
||||
Assert.assertTrue(e.getMessage().contains(
|
||||
"Unauthorized request to start container. "));
|
||||
Assert.assertTrue(e.getMessage().contains(
|
||||
"\nExpected resource " + tokenId.getResource().toString()
|
||||
+ " but found " + context.getResource().toString()));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,365 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server;
|
||||
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.avro.AvroRuntimeException;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||
import org.apache.hadoop.io.DataInputBuffer;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.SecurityInfo;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.yarn.api.AMRMProtocol;
|
||||
import org.apache.hadoop.yarn.api.ContainerManager;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.URL;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.security.ContainerManagerSecurityInfo;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestContainerTokenSecretManager {
|
||||
|
||||
private static Log LOG = LogFactory
|
||||
.getLog(TestContainerTokenSecretManager.class);
|
||||
private static final RecordFactory recordFactory = RecordFactoryProvider
|
||||
.getRecordFactory(null);
|
||||
private static FileContext localFS = null;
|
||||
private static final File localDir = new File("target",
|
||||
TestContainerTokenSecretManager.class.getName() + "-localDir")
|
||||
.getAbsoluteFile();
|
||||
private static MiniYARNCluster yarnCluster;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws AccessControlException,
|
||||
FileNotFoundException, UnsupportedFileSystemException, IOException {
|
||||
localFS = FileContext.getLocalFSFileContext();
|
||||
localFS.delete(new Path(localDir.getAbsolutePath()), true);
|
||||
localDir.mkdir();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardown() {
|
||||
yarnCluster.stop();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void test() throws IOException, InterruptedException {
|
||||
|
||||
final ApplicationId appID = recordFactory.newRecordInstance(ApplicationId.class);
|
||||
appID.setClusterTimestamp(1234);
|
||||
appID.setId(5);
|
||||
|
||||
final Configuration conf = new Configuration();
|
||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||
"kerberos");
|
||||
// Set AM expiry interval to be very long.
|
||||
conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 100000L);
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
yarnCluster =
|
||||
new MiniYARNCluster(TestContainerTokenSecretManager.class.getName());
|
||||
yarnCluster.init(conf);
|
||||
yarnCluster.start();
|
||||
|
||||
ResourceManager resourceManager = yarnCluster.getResourceManager();
|
||||
|
||||
final YarnRPC yarnRPC = YarnRPC.create(conf);
|
||||
|
||||
// Submit an application
|
||||
ApplicationSubmissionContext appSubmissionContext =
|
||||
recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
|
||||
appSubmissionContext.setApplicationId(appID);
|
||||
ContainerLaunchContext amContainer =
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
amContainer.setResource(Resources.createResource(1024));
|
||||
amContainer.setCommands(Arrays.asList("sleep", "100"));
|
||||
appSubmissionContext.setUser("testUser");
|
||||
|
||||
// TODO: Use a resource to work around bugs. Today NM doesn't create local
|
||||
// app-dirs if there are no file to download!!
|
||||
File file = new File(localDir.getAbsolutePath(), "testFile");
|
||||
FileWriter tmpFile = new FileWriter(file);
|
||||
tmpFile.write("testing");
|
||||
tmpFile.close();
|
||||
URL testFileURL =
|
||||
ConverterUtils.getYarnUrlFromPath(FileContext.getFileContext()
|
||||
.makeQualified(new Path(localDir.getAbsolutePath(), "testFile")));
|
||||
LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
|
||||
rsrc.setResource(testFileURL);
|
||||
rsrc.setSize(file.length());
|
||||
rsrc.setTimestamp(file.lastModified());
|
||||
rsrc.setType(LocalResourceType.FILE);
|
||||
rsrc.setVisibility(LocalResourceVisibility.PRIVATE);
|
||||
amContainer.setLocalResources(Collections.singletonMap("testFile", rsrc));
|
||||
SubmitApplicationRequest submitRequest = recordFactory
|
||||
.newRecordInstance(SubmitApplicationRequest.class);
|
||||
submitRequest.setApplicationSubmissionContext(appSubmissionContext);
|
||||
appSubmissionContext.setAMContainerSpec(amContainer);
|
||||
resourceManager.getClientRMService().submitApplication(submitRequest);
|
||||
|
||||
// Wait till container gets allocated for AM
|
||||
int waitCounter = 0;
|
||||
RMApp app = resourceManager.getRMContext().getRMApps().get(appID);
|
||||
RMAppAttempt appAttempt = app == null ? null : app.getCurrentAppAttempt();
|
||||
RMAppAttemptState state = appAttempt == null ? null : appAttempt
|
||||
.getAppAttemptState();
|
||||
while ((app == null || appAttempt == null || state == null
|
||||
|| !state.equals(RMAppAttemptState.LAUNCHED)) && waitCounter++ != 20) {
|
||||
LOG.info("Waiting for applicationAttempt to be created.. ");
|
||||
Thread.sleep(1000);
|
||||
app = resourceManager.getRMContext().getRMApps().get(appID);
|
||||
appAttempt = app == null ? null : app.getCurrentAppAttempt();
|
||||
state = appAttempt == null ? null : appAttempt.getAppAttemptState();
|
||||
}
|
||||
Assert.assertNotNull(app);
|
||||
Assert.assertNotNull(appAttempt);
|
||||
Assert.assertNotNull(state);
|
||||
Assert.assertEquals(RMAppAttemptState.LAUNCHED, state);
|
||||
|
||||
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
|
||||
|
||||
// Ask for a container from the RM
|
||||
String schedulerAddressString =
|
||||
conf.get(YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS);
|
||||
final InetSocketAddress schedulerAddr =
|
||||
NetUtils.createSocketAddr(schedulerAddressString);
|
||||
ApplicationTokenIdentifier appTokenIdentifier = new ApplicationTokenIdentifier(
|
||||
appAttempt.getAppAttemptId());
|
||||
ApplicationTokenSecretManager appTokenSecretManager =
|
||||
new ApplicationTokenSecretManager();
|
||||
appTokenSecretManager.setMasterKey(ApplicationTokenSecretManager
|
||||
.createSecretKey("Dummy".getBytes())); // TODO: FIX. Be in Sync with
|
||||
// ResourceManager.java
|
||||
Token<ApplicationTokenIdentifier> appToken =
|
||||
new Token<ApplicationTokenIdentifier>(appTokenIdentifier,
|
||||
appTokenSecretManager);
|
||||
appToken.setService(new Text(schedulerAddressString));
|
||||
currentUser.addToken(appToken);
|
||||
|
||||
AMRMProtocol scheduler =
|
||||
currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
|
||||
@Override
|
||||
public AMRMProtocol run() {
|
||||
return (AMRMProtocol) yarnRPC.getProxy(AMRMProtocol.class,
|
||||
schedulerAddr, conf);
|
||||
}
|
||||
});
|
||||
|
||||
// Register the appMaster
|
||||
RegisterApplicationMasterRequest request =
|
||||
recordFactory
|
||||
.newRecordInstance(RegisterApplicationMasterRequest.class);
|
||||
request.setApplicationAttemptId(resourceManager.getRMContext()
|
||||
.getRMApps().get(appID).getCurrentAppAttempt().getAppAttemptId());
|
||||
scheduler.registerApplicationMaster(request);
|
||||
|
||||
// Now request a container allocation.
|
||||
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
||||
ResourceRequest rr = recordFactory.newRecordInstance(ResourceRequest.class);
|
||||
rr.setCapability(recordFactory.newRecordInstance(Resource.class));
|
||||
rr.getCapability().setMemory(1024);
|
||||
rr.setHostName("*");
|
||||
rr.setNumContainers(1);
|
||||
rr.setPriority(recordFactory.newRecordInstance(Priority.class));
|
||||
rr.getPriority().setPriority(0);
|
||||
ask.add(rr);
|
||||
ArrayList<ContainerId> release = new ArrayList<ContainerId>();
|
||||
|
||||
AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
|
||||
appAttempt.getAppAttemptId(), 0, 0F, ask, release);
|
||||
List<Container> allocatedContainers = scheduler.allocate(allocateRequest)
|
||||
.getAMResponse().getAllocatedContainers();
|
||||
|
||||
waitCounter = 0;
|
||||
while ((allocatedContainers == null || allocatedContainers.size() == 0)
|
||||
&& waitCounter++ != 20) {
|
||||
LOG.info("Waiting for container to be allocated..");
|
||||
Thread.sleep(1000);
|
||||
allocateRequest.setResponseId(allocateRequest.getResponseId() + 1);
|
||||
allocatedContainers =
|
||||
scheduler.allocate(allocateRequest).getAMResponse()
|
||||
.getAllocatedContainers();
|
||||
}
|
||||
|
||||
Assert.assertNotNull("Container is not allocted!", allocatedContainers);
|
||||
Assert.assertEquals("Didn't get one container!", 1,
|
||||
allocatedContainers.size());
|
||||
|
||||
// Now talk to the NM for launching the container.
|
||||
final Container allocatedContainer = allocatedContainers.get(0);
|
||||
ContainerToken containerToken = allocatedContainer.getContainerToken();
|
||||
Token<ContainerTokenIdentifier> token =
|
||||
new Token<ContainerTokenIdentifier>(
|
||||
containerToken.getIdentifier().array(),
|
||||
containerToken.getPassword().array(), new Text(
|
||||
containerToken.getKind()), new Text(
|
||||
containerToken.getService()));
|
||||
currentUser.addToken(token);
|
||||
currentUser.doAs(new PrivilegedAction<Void>() {
|
||||
@Override
|
||||
public Void run() {
|
||||
ContainerManager client = (ContainerManager) yarnRPC.getProxy(
|
||||
ContainerManager.class, NetUtils
|
||||
.createSocketAddr(allocatedContainer.getNodeId().toString()),
|
||||
conf);
|
||||
try {
|
||||
LOG.info("Going to make a getContainerStatus() legal request");
|
||||
GetContainerStatusRequest request =
|
||||
recordFactory
|
||||
.newRecordInstance(GetContainerStatusRequest.class);
|
||||
ContainerId containerID =
|
||||
recordFactory.newRecordInstance(ContainerId.class);
|
||||
ApplicationAttemptId appAttemptId =
|
||||
recordFactory.newRecordInstance(ApplicationAttemptId.class);
|
||||
appAttemptId.setApplicationId(appID);
|
||||
appAttemptId.setAttemptId(1);
|
||||
appAttemptId.setApplicationId(appID);
|
||||
containerID.setApplicationAttemptId(appAttemptId);
|
||||
containerID.setId(1);
|
||||
request.setContainerId(containerID);
|
||||
client.getContainerStatus(request);
|
||||
} catch (YarnRemoteException e) {
|
||||
LOG.info("Error", e);
|
||||
} catch (AvroRuntimeException e) {
|
||||
LOG.info("Got the expected exception");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
UserGroupInformation maliceUser =
|
||||
UserGroupInformation.createRemoteUser(currentUser.getShortUserName());
|
||||
byte[] identifierBytes = containerToken.getIdentifier().array();
|
||||
DataInputBuffer di = new DataInputBuffer();
|
||||
di.reset(identifierBytes, identifierBytes.length);
|
||||
ContainerTokenIdentifier dummyIdentifier = new ContainerTokenIdentifier();
|
||||
dummyIdentifier.readFields(di);
|
||||
Resource modifiedResource = recordFactory.newRecordInstance(Resource.class);
|
||||
modifiedResource.setMemory(2048);
|
||||
ContainerTokenIdentifier modifiedIdentifier =
|
||||
new ContainerTokenIdentifier(dummyIdentifier.getContainerID(),
|
||||
dummyIdentifier.getNmHostName(), modifiedResource);
|
||||
// Malice user modifies the resource amount
|
||||
Token<ContainerTokenIdentifier> modifiedToken =
|
||||
new Token<ContainerTokenIdentifier>(modifiedIdentifier.getBytes(),
|
||||
containerToken.getPassword().array(), new Text(
|
||||
containerToken.getKind()), new Text(
|
||||
containerToken.getService()));
|
||||
maliceUser.addToken(modifiedToken);
|
||||
maliceUser.doAs(new PrivilegedAction<Void>() {
|
||||
@Override
|
||||
public Void run() {
|
||||
ContainerManager client = (ContainerManager) yarnRPC.getProxy(
|
||||
ContainerManager.class, NetUtils
|
||||
.createSocketAddr(allocatedContainer.getNodeId().toString()),
|
||||
conf);
|
||||
ContainerId containerID;
|
||||
|
||||
LOG.info("Going to contact NM: ilLegal request");
|
||||
GetContainerStatusRequest request =
|
||||
recordFactory
|
||||
.newRecordInstance(GetContainerStatusRequest.class);
|
||||
containerID =
|
||||
recordFactory.newRecordInstance(ContainerId.class);
|
||||
ApplicationAttemptId appAttemptId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
|
||||
appAttemptId.setApplicationId(appID);
|
||||
appAttemptId.setAttemptId(1);
|
||||
appAttemptId.setApplicationId(appID);
|
||||
containerID.setApplicationAttemptId(appAttemptId);
|
||||
containerID.setId(1);
|
||||
request.setContainerId(containerID);
|
||||
try {
|
||||
client.getContainerStatus(request);
|
||||
fail("Connection initiation with illegally modified "
|
||||
+ "tokens is expected to fail.");
|
||||
} catch (YarnRemoteException e) {
|
||||
LOG.error("Got exception", e);
|
||||
fail("Cannot get a YARN remote exception as " +
|
||||
"it will indicate RPC success");
|
||||
} catch (Exception e) {
|
||||
Assert.assertEquals(
|
||||
java.lang.reflect.UndeclaredThrowableException.class
|
||||
.getCanonicalName(), e.getClass().getCanonicalName());
|
||||
Assert
|
||||
.assertEquals(
|
||||
"DIGEST-MD5: digest response format violation. Mismatched response.",
|
||||
e.getCause().getCause().getMessage());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
|
@ -12,7 +12,7 @@
|
|||
|
||||
# log4j configuration used during build and unit tests
|
||||
|
||||
log4j.rootLogger=info,stdout
|
||||
log4j.rootLogger=INFO,stdout
|
||||
log4j.threshhold=ALL
|
||||
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
|
||||
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
|
||||
|
|
Loading…
Reference in New Issue