Merge -c 1194850 from trunk to branch-0.23 to fix MAPREDUCE-3256.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1194851 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2011-10-29 09:37:02 +00:00
parent bf72abd737
commit 6f2f253446
17 changed files with 757 additions and 500 deletions

View File

@ -1749,7 +1749,7 @@ Release 0.23.0 - Unreleased
etc. (Jonathan Eagles via acmurthy) etc. (Jonathan Eagles via acmurthy)
MAPREDUCE-3257. Added authorization checks for the protocol between MAPREDUCE-3257. Added authorization checks for the protocol between
ResourceManager and ApplicatoinMaster. (vinodkv via acmurthy) ResourceManager and ApplicationMaster. (vinodkv via acmurthy)
MAPREDUCE-3259. Added java.library.path of NodeManager to MAPREDUCE-3259. Added java.library.path of NodeManager to
ContainerLocalizer in LinuxContainerExecutor. (Kihwal Lee via acmurthy) ContainerLocalizer in LinuxContainerExecutor. (Kihwal Lee via acmurthy)
@ -1815,6 +1815,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3248. Fixed log4j properties. (vinodkv via acmurthy) MAPREDUCE-3248. Fixed log4j properties. (vinodkv via acmurthy)
MAPREDUCE-3256. Added authorization checks for the protocol between
NodeManager and ApplicationMaster. (vinodkv via acmurthy)
Release 0.22.0 - Unreleased Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -21,11 +21,13 @@ package org.apache.hadoop.mapreduce.v2.app.launcher;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
@ -83,10 +85,14 @@ public class ContainerLauncherImpl extends AbstractService implements
private BlockingQueue<ContainerLauncherEvent> eventQueue = private BlockingQueue<ContainerLauncherEvent> eventQueue =
new LinkedBlockingQueue<ContainerLauncherEvent>(); new LinkedBlockingQueue<ContainerLauncherEvent>();
private RecordFactory recordFactory; private RecordFactory recordFactory;
//have a cache/map of UGIs so as to avoid creating too many RPC
//client connection objects to the same NodeManager // To track numNodes.
private ConcurrentMap<String, UserGroupInformation> ugiMap = Set<String> allNodes = new HashSet<String>();
new ConcurrentHashMap<String, UserGroupInformation>();
// have a cache/map of proxies so as to avoid creating multiple RPC
// client connection objects for the same container.
private Map<ContainerId, ContainerManager> clientCache
= new HashMap<ContainerId, ContainerManager>();
public ContainerLauncherImpl(AppContext context) { public ContainerLauncherImpl(AppContext context) {
super(ContainerLauncherImpl.class.getName()); super(ContainerLauncherImpl.class.getName());
@ -134,7 +140,7 @@ public class ContainerLauncherImpl extends AbstractService implements
// nodes where containers will run at *this* point of time. This is // nodes where containers will run at *this* point of time. This is
// *not* the cluster size and doesn't need to be. // *not* the cluster size and doesn't need to be.
int numNodes = ugiMap.size(); int numNodes = allNodes.size();
int idealPoolSize = Math.min(limitOnPoolSize, numNodes); int idealPoolSize = Math.min(limitOnPoolSize, numNodes);
if (poolSize <= idealPoolSize) { if (poolSize <= idealPoolSize) {
@ -142,7 +148,8 @@ public class ContainerLauncherImpl extends AbstractService implements
// later is just a buffer so we are not always increasing the // later is just a buffer so we are not always increasing the
// pool-size // pool-size
int newPoolSize = idealPoolSize + INITIAL_POOL_SIZE; int newPoolSize = idealPoolSize + INITIAL_POOL_SIZE;
LOG.debug("Setting pool size to " + newPoolSize); LOG.info("Setting ContainerLauncher pool size to "
+ newPoolSize);
launcherPool.setCorePoolSize(newPoolSize); launcherPool.setCorePoolSize(newPoolSize);
} }
} }
@ -167,37 +174,43 @@ public class ContainerLauncherImpl extends AbstractService implements
super.stop(); super.stop();
} }
protected ContainerManager getCMProxy( protected ContainerManager getCMProxy(ContainerId containerID,
final String containerManagerBindAddr, ContainerToken containerToken) final String containerManagerBindAddr, ContainerToken containerToken)
throws IOException { throws IOException {
UserGroupInformation user = UserGroupInformation.getCurrentUser(); UserGroupInformation user = UserGroupInformation.getCurrentUser();
if (UserGroupInformation.isSecurityEnabled()) { synchronized (this.clientCache) {
Token<ContainerTokenIdentifier> token = new Token<ContainerTokenIdentifier>( if (this.clientCache.containsKey(containerID)) {
containerToken.getIdentifier().array(), containerToken return this.clientCache.get(containerID);
.getPassword().array(), new Text(containerToken.getKind()), }
new Text(containerToken.getService()));
// the user in createRemoteUser in this context is not important
UserGroupInformation ugi = UserGroupInformation
.createRemoteUser(containerManagerBindAddr);
ugi.addToken(token);
ugiMap.putIfAbsent(containerManagerBindAddr, ugi);
user = ugiMap.get(containerManagerBindAddr); this.allNodes.add(containerManagerBindAddr);
if (UserGroupInformation.isSecurityEnabled()) {
Token<ContainerTokenIdentifier> token = new Token<ContainerTokenIdentifier>(
containerToken.getIdentifier().array(), containerToken
.getPassword().array(), new Text(containerToken.getKind()),
new Text(containerToken.getService()));
// the user in createRemoteUser in this context has to be ContainerID
user = UserGroupInformation.createRemoteUser(containerID.toString());
user.addToken(token);
}
ContainerManager proxy = user
.doAs(new PrivilegedAction<ContainerManager>() {
@Override
public ContainerManager run() {
YarnRPC rpc = YarnRPC.create(getConfig());
return (ContainerManager) rpc.getProxy(ContainerManager.class,
NetUtils.createSocketAddr(containerManagerBindAddr),
getConfig());
}
});
this.clientCache.put(containerID, proxy);
return proxy;
} }
ContainerManager proxy =
user.doAs(new PrivilegedAction<ContainerManager>() {
@Override
public ContainerManager run() {
YarnRPC rpc = YarnRPC.create(getConfig());
return (ContainerManager) rpc.getProxy(ContainerManager.class,
NetUtils.createSocketAddr(containerManagerBindAddr),
getConfig());
}
});
return proxy;
} }
private static class CommandTimer extends TimerTask { private static class CommandTimer extends TimerTask {
@ -213,7 +226,6 @@ public class ContainerLauncherImpl extends AbstractService implements
+ ". Interrupting and returning"; + ". Interrupting and returning";
} }
@Override @Override
public void run() { public void run() {
LOG.warn(this.message); LOG.warn(this.message);
@ -255,8 +267,8 @@ public class ContainerLauncherImpl extends AbstractService implements
timer.schedule(new CommandTimer(Thread.currentThread(), event), timer.schedule(new CommandTimer(Thread.currentThread(), event),
nmTimeOut); nmTimeOut);
ContainerManager proxy = getCMProxy(containerManagerBindAddr, ContainerManager proxy = getCMProxy(containerID,
containerToken); containerManagerBindAddr, containerToken);
// Interruped during getProxy, but that didn't throw exception // Interruped during getProxy, but that didn't throw exception
if (Thread.currentThread().isInterrupted()) { if (Thread.currentThread().isInterrupted()) {
@ -331,8 +343,8 @@ public class ContainerLauncherImpl extends AbstractService implements
timer.schedule(new CommandTimer(Thread.currentThread(), event), timer.schedule(new CommandTimer(Thread.currentThread(), event),
nmTimeOut); nmTimeOut);
ContainerManager proxy = getCMProxy(containerManagerBindAddr, ContainerManager proxy = getCMProxy(containerID,
containerToken); containerManagerBindAddr, containerToken);
if (Thread.currentThread().isInterrupted()) { if (Thread.currentThread().isInterrupted()) {
// The timer cancelled the command in the mean while. No need to // The timer cancelled the command in the mean while. No need to

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
import org.apache.hadoop.yarn.api.ContainerManager; import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerToken; import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.junit.Test; import org.junit.Test;
@ -109,7 +110,7 @@ public class TestContainerLauncher {
protected ContainerLauncher createContainerLauncher(AppContext context) { protected ContainerLauncher createContainerLauncher(AppContext context) {
return new ContainerLauncherImpl(context) { return new ContainerLauncherImpl(context) {
@Override @Override
protected ContainerManager getCMProxy( protected ContainerManager getCMProxy(ContainerId containerID,
String containerManagerBindAddr, ContainerToken containerToken) String containerManagerBindAddr, ContainerToken containerToken)
throws IOException { throws IOException {
try { try {

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
import org.apache.hadoop.yarn.api.ContainerManager; import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerToken; import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.junit.Test; import org.junit.Test;
@ -218,7 +219,7 @@ public class TestFail {
} }
@Override @Override
protected ContainerManager getCMProxy( protected ContainerManager getCMProxy(ContainerId contianerID,
String containerManagerBindAddr, ContainerToken containerToken) String containerManagerBindAddr, ContainerToken containerToken)
throws IOException { throws IOException {
try { try {

View File

@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.api.AMRMProtocol;
*/ */
@Public @Public
@Stable @Stable
public interface Resource extends Comparable<Resource> { public abstract class Resource implements Comparable<Resource> {
/** /**
* Get <em>memory</em> of the resource. * Get <em>memory</em> of the resource.
@ -53,5 +53,31 @@ public interface Resource extends Comparable<Resource> {
@Public @Public
@Stable @Stable
public abstract void setMemory(int memory); public abstract void setMemory(int memory);
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + getMemory();
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Resource other = (Resource) obj;
if (getMemory() != other.getMemory())
return false;
return true;
}
@Override
public String toString() {
return "memory: " + getMemory();
}
} }

View File

@ -19,14 +19,11 @@
package org.apache.hadoop.yarn.api.records.impl.pb; package org.apache.hadoop.yarn.api.records.impl.pb;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProtoOrBuilder;
public class ResourcePBImpl extends Resource {
public class ResourcePBImpl extends ProtoBase<ResourceProto> implements Resource {
ResourceProto proto = ResourceProto.getDefaultInstance(); ResourceProto proto = ResourceProto.getDefaultInstance();
ResourceProto.Builder builder = null; ResourceProto.Builder builder = null;
boolean viaProto = false; boolean viaProto = false;

View File

@ -33,23 +33,22 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.util.BuilderUtils;
public class ContainerTokenIdentifier extends TokenIdentifier { public class ContainerTokenIdentifier extends TokenIdentifier {
private static Log LOG = LogFactory private static Log LOG = LogFactory.getLog(ContainerTokenIdentifier.class);
.getLog(ContainerTokenIdentifier.class);
public static final Text KIND = new Text("ContainerToken"); public static final Text KIND = new Text("ContainerToken");
private ContainerId containerId; private ContainerId containerId;
private String nmHostName; private String nmHostAddr;
private Resource resource; private Resource resource;
public ContainerTokenIdentifier(ContainerId containerID, String hostName, public ContainerTokenIdentifier(ContainerId containerID, String hostName,
Resource r) { Resource r) {
this.containerId = containerID; this.containerId = containerID;
this.nmHostName = hostName; this.nmHostAddr = hostName;
this.resource = r; this.resource = r;
} }
@ -57,59 +56,46 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
} }
public ContainerId getContainerID() { public ContainerId getContainerID() {
return containerId; return this.containerId;
} }
public String getNmHostName() { public String getNmHostAddress() {
return nmHostName; return this.nmHostAddr;
} }
public Resource getResource() { public Resource getResource() {
return resource; return this.resource;
} }
@Override @Override
public void write(DataOutput out) throws IOException { public void write(DataOutput out) throws IOException {
LOG.debug("Writing ContainerTokenIdentifier to RPC layer"); LOG.debug("Writing ContainerTokenIdentifier to RPC layer: " + this);
ApplicationAttemptId applicationAttemptId = ApplicationAttemptId applicationAttemptId = this.containerId
containerId.getApplicationAttemptId(); .getApplicationAttemptId();
ApplicationId applicationId = applicationAttemptId.getApplicationId(); ApplicationId applicationId = applicationAttemptId.getApplicationId();
out.writeLong(applicationId.getClusterTimestamp()); out.writeLong(applicationId.getClusterTimestamp());
out.writeInt(applicationId.getId()); out.writeInt(applicationId.getId());
out.writeInt(applicationAttemptId.getAttemptId()); out.writeInt(applicationAttemptId.getAttemptId());
out.writeInt(this.containerId.getId()); out.writeInt(this.containerId.getId());
out.writeUTF(this.nmHostName); out.writeUTF(this.nmHostAddr);
out.writeInt(this.resource.getMemory()); out.writeInt(this.resource.getMemory());
} }
@Override @Override
public void readFields(DataInput in) throws IOException { public void readFields(DataInput in) throws IOException {
this.containerId = ApplicationId applicationId = BuilderUtils.newApplicationId(
RecordFactoryProvider.getRecordFactory(null).newRecordInstance( in.readLong(), in.readInt());
ContainerId.class); ApplicationAttemptId applicationAttemptId = BuilderUtils
ApplicationAttemptId applicationAttemptId = .newApplicationAttemptId(applicationId, in.readInt());
RecordFactoryProvider.getRecordFactory(null).newRecordInstance( this.containerId = BuilderUtils.newContainerId(applicationAttemptId, in
ApplicationAttemptId.class); .readInt());
ApplicationId applicationId = this.nmHostAddr = in.readUTF();
RecordFactoryProvider.getRecordFactory(null).newRecordInstance( this.resource = BuilderUtils.newResource(in.readInt());
ApplicationId.class);
applicationId.setClusterTimestamp(in.readLong());
applicationId.setId(in.readInt());
applicationAttemptId.setApplicationId(applicationId);
applicationAttemptId.setAttemptId(in.readInt());
this.containerId.setApplicationAttemptId(applicationAttemptId);
this.containerId.setId(in.readInt());
this.nmHostName = in.readUTF();
this.resource =
RecordFactoryProvider.getRecordFactory(null).newRecordInstance(
Resource.class);
this.resource.setMemory(in.readInt());
} }
@SuppressWarnings("static-access")
@Override @Override
public Text getKind() { public Text getKind() {
return this.KIND; return KIND;
} }
@Override @Override
@ -117,7 +103,6 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
return UserGroupInformation.createRemoteUser(this.containerId.toString()); return UserGroupInformation.createRemoteUser(this.containerId.toString());
} }
@InterfaceAudience.Private @InterfaceAudience.Private
public static class Renewer extends Token.TrivialRenewer { public static class Renewer extends Token.TrivialRenewer {
@Override @Override

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.security;
import java.util.Collection; import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
@ -28,6 +30,10 @@ import org.apache.hadoop.security.token.TokenSelector;
public class ContainerTokenSelector implements public class ContainerTokenSelector implements
TokenSelector<ContainerTokenIdentifier> { TokenSelector<ContainerTokenIdentifier> {
private static final Log LOG = LogFactory
.getLog(ContainerTokenSelector.class);
@SuppressWarnings("unchecked")
@Override @Override
public Token<ContainerTokenIdentifier> selectToken(Text service, public Token<ContainerTokenIdentifier> selectToken(Text service,
Collection<Token<? extends TokenIdentifier>> tokens) { Collection<Token<? extends TokenIdentifier>> tokens) {
@ -35,6 +41,10 @@ public class ContainerTokenSelector implements
return null; return null;
} }
for (Token<? extends TokenIdentifier> token : tokens) { for (Token<? extends TokenIdentifier> token : tokens) {
if (LOG.isDebugEnabled()) {
LOG.info("Looking for service: " + service + ". Current token is "
+ token);
}
if (ContainerTokenIdentifier.KIND.equals(token.getKind()) && if (ContainerTokenIdentifier.KIND.equals(token.getKind()) &&
service.equals(token.getService())) { service.equals(token.getService())) {
return (Token<ContainerTokenIdentifier>) token; return (Token<ContainerTokenIdentifier>) token;

View File

@ -27,10 +27,11 @@ import java.util.Map;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@ -45,7 +46,6 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
@ -256,6 +256,12 @@ public class BuilderUtils {
return container; return container;
} }
public static Priority newPriority(int p) {
Priority priority = recordFactory.newRecordInstance(Priority.class);
priority.setPriority(p);
return priority;
}
public static ResourceRequest newResourceRequest(Priority priority, public static ResourceRequest newResourceRequest(Priority priority,
String hostName, Resource capability, int numContainers) { String hostName, Resource capability, int numContainers) {
ResourceRequest request = recordFactory ResourceRequest request = recordFactory

View File

@ -56,19 +56,19 @@ public class ContainerTokenSecretManager extends
@Override @Override
public byte[] createPassword(ContainerTokenIdentifier identifier) { public byte[] createPassword(ContainerTokenIdentifier identifier) {
LOG.debug("Creating password for " + identifier.getContainerID() LOG.debug("Creating password for " + identifier.getContainerID()
+ " to be run on NM " + identifier.getNmHostName() + " " + " to be run on NM " + identifier.getNmHostAddress() + " "
+ this.secretkeys.get(identifier.getNmHostName())); + this.secretkeys.get(identifier.getNmHostAddress()));
return createPassword(identifier.getBytes(), return createPassword(identifier.getBytes(),
this.secretkeys.get(identifier.getNmHostName())); this.secretkeys.get(identifier.getNmHostAddress()));
} }
@Override @Override
public byte[] retrievePassword(ContainerTokenIdentifier identifier) public byte[] retrievePassword(ContainerTokenIdentifier identifier)
throws org.apache.hadoop.security.token.SecretManager.InvalidToken { throws org.apache.hadoop.security.token.SecretManager.InvalidToken {
LOG.debug("Retrieving password for " + identifier.getContainerID() LOG.debug("Retrieving password for " + identifier.getContainerID()
+ " to be run on NM " + identifier.getNmHostName()); + " to be run on NM " + identifier.getNmHostAddress());
return createPassword(identifier.getBytes(), return createPassword(identifier.getBytes(),
this.secretkeys.get(identifier.getNmHostName())); this.secretkeys.get(identifier.getNmHostAddress()));
} }
@Override @Override

View File

@ -27,18 +27,19 @@ import java.net.UnknownHostException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.Server;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.ContainerManager; import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
@ -59,6 +61,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
@ -105,6 +108,7 @@ public class ContainerManagerImpl extends CompositeService implements
final Context context; final Context context;
private final ContainersMonitor containersMonitor; private final ContainersMonitor containersMonitor;
private Server server; private Server server;
private InetAddress resolvedAddress = null;
private final ResourceLocalizationService rsrcLocalizationSrvc; private final ResourceLocalizationService rsrcLocalizationSrvc;
private final ContainersLauncher containersLauncher; private final ContainersLauncher containersLauncher;
private final AuxServices auxiliaryServices; private final AuxServices auxiliaryServices;
@ -213,13 +217,12 @@ public class ContainerManagerImpl extends CompositeService implements
} }
server.start(); server.start();
InetAddress hostNameResolved = null;
try { try {
hostNameResolved = InetAddress.getLocalHost(); resolvedAddress = InetAddress.getLocalHost();
} catch (UnknownHostException e) { } catch (UnknownHostException e) {
throw new YarnException(e); throw new YarnException(e);
} }
this.context.getNodeId().setHost(hostNameResolved.getCanonicalHostName()); this.context.getNodeId().setHost(resolvedAddress.getCanonicalHostName());
this.context.getNodeId().setPort(server.getPort()); this.context.getNodeId().setPort(server.getPort());
LOG.info("ContainerManager started at " LOG.info("ContainerManager started at "
+ this.context.getNodeId().toString()); + this.context.getNodeId().toString());
@ -242,6 +245,79 @@ public class ContainerManagerImpl extends CompositeService implements
super.stop(); super.stop();
} }
/**
* Authorize the request.
*
* @param containerID
* of the container
* @param launchContext
* passed if verifying the startContainer, null otherwise.
* @throws YarnRemoteException
*/
private void authorizeRequest(ContainerId containerID,
ContainerLaunchContext launchContext) throws YarnRemoteException {
if (!UserGroupInformation.isSecurityEnabled()) {
return;
}
String containerIDStr = containerID.toString();
UserGroupInformation remoteUgi;
try {
remoteUgi = UserGroupInformation.getCurrentUser();
} catch (IOException e) {
String msg = "Cannot obtain the user-name for containerId: "
+ containerIDStr + ". Got exception: "
+ StringUtils.stringifyException(e);
LOG.warn(msg);
throw RPCUtil.getRemoteException(msg);
}
boolean unauthorized = false;
StringBuilder messageBuilder = new StringBuilder(
"Unauthorized request to start container. ");
if (!remoteUgi.getUserName().equals(containerIDStr)) {
unauthorized = true;
messageBuilder.append("\nExpected containerId: "
+ remoteUgi.getUserName() + " Found: " + containerIDStr);
}
if (launchContext != null) {
// Verify other things for startContainer() request.
if (LOG.isDebugEnabled()) {
LOG.debug("Number of TokenIdentifiers in the UGI from RPC: "
+ remoteUgi.getTokenIdentifiers().size());
}
// We must and should get only one TokenIdentifier from the RPC.
ContainerTokenIdentifier tokenId = (ContainerTokenIdentifier) remoteUgi
.getTokenIdentifiers().iterator().next();
if (tokenId == null) {
unauthorized = true;
messageBuilder
.append("\nContainerTokenIdentifier cannot be null! Null found for "
+ containerIDStr);
} else {
Resource resource = tokenId.getResource();
if (!resource.equals(launchContext.getResource())) {
unauthorized = true;
messageBuilder.append("\nExpected resource " + resource
+ " but found " + launchContext.getResource());
}
}
}
if (unauthorized) {
String msg = messageBuilder.toString();
LOG.error(msg);
throw RPCUtil.getRemoteException(msg);
}
}
/** /**
* Start a container on this NodeManager. * Start a container on this NodeManager.
*/ */
@ -251,8 +327,11 @@ public class ContainerManagerImpl extends CompositeService implements
throws YarnRemoteException { throws YarnRemoteException {
ContainerLaunchContext launchContext = request.getContainerLaunchContext(); ContainerLaunchContext launchContext = request.getContainerLaunchContext();
ContainerId containerID = launchContext.getContainerId();
authorizeRequest(containerID, launchContext);
LOG.info(" container is " + request); LOG.info(" container is " + request);
// //////////// Parse credentials // //////////// Parse credentials
ByteBuffer tokens = launchContext.getContainerTokens(); ByteBuffer tokens = launchContext.getContainerTokens();
Credentials credentials = new Credentials(); Credentials credentials = new Credentials();
@ -274,9 +353,8 @@ public class ContainerManagerImpl extends CompositeService implements
} }
// //////////// End of parsing credentials // //////////// End of parsing credentials
Container container = Container container = new ContainerImpl(getConfig(), this.dispatcher,
new ContainerImpl(getConfig(), this.dispatcher, launchContext, credentials, metrics); launchContext, credentials, metrics);
ContainerId containerID = launchContext.getContainerId();
ApplicationId applicationID = ApplicationId applicationID =
containerID.getApplicationAttemptId().getApplicationId(); containerID.getApplicationAttemptId().getApplicationId();
if (context.getContainers().putIfAbsent(containerID, container) != null) { if (context.getContainers().putIfAbsent(containerID, container) != null) {
@ -319,39 +397,36 @@ public class ContainerManagerImpl extends CompositeService implements
return response; return response;
} }
/**
* Stop the container running on this NodeManager.
*/
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public StopContainerResponse stopContainer(StopContainerRequest request) public StopContainerResponse stopContainer(StopContainerRequest request)
throws YarnRemoteException { throws YarnRemoteException {
ContainerId containerID = request.getContainerId();
// TODO: Only the container's owner can kill containers today.
authorizeRequest(containerID, null);
StopContainerResponse response = StopContainerResponse response =
recordFactory.newRecordInstance(StopContainerResponse.class); recordFactory.newRecordInstance(StopContainerResponse.class);
ContainerId containerID = request.getContainerId();
Container container = this.context.getContainers().get(containerID); Container container = this.context.getContainers().get(containerID);
if (container == null) { if (container == null) {
LOG.warn("Trying to stop unknown container " + containerID); LOG.warn("Trying to stop unknown container " + containerID);
String userName; NMAuditLogger.logFailure("UnknownUser",
try {
userName = UserGroupInformation.getCurrentUser().getUserName();
} catch (IOException e) {
LOG.error("Error finding userName", e);
return response;
}
NMAuditLogger.logFailure(userName,
AuditConstants.STOP_CONTAINER, "ContainerManagerImpl", AuditConstants.STOP_CONTAINER, "ContainerManagerImpl",
"Trying to stop unknown container!", "Trying to stop unknown container!",
containerID.getApplicationAttemptId().getApplicationId(), containerID.getApplicationAttemptId().getApplicationId(),
containerID); containerID);
return response; // Return immediately. return response; // Return immediately.
} }
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
new ContainerKillEvent(containerID, new ContainerKillEvent(containerID,
"Container killed by the ApplicationMaster.")); "Container killed by the ApplicationMaster."));
// user logged here not ideal since just getting user from container but
// request doesn't have anything and should be coming from user of AM so
// should be the same or should be rejected by auth before here.
NMAuditLogger.logSuccess(container.getUser(), NMAuditLogger.logSuccess(container.getUser(),
AuditConstants.STOP_CONTAINER, "ContainerManageImpl", AuditConstants.STOP_CONTAINER, "ContainerManageImpl",
containerID.getApplicationAttemptId().getApplicationId(), containerID.getApplicationAttemptId().getApplicationId(),
@ -365,20 +440,26 @@ public class ContainerManagerImpl extends CompositeService implements
} }
@Override @Override
public GetContainerStatusResponse getContainerStatus(GetContainerStatusRequest request) throws YarnRemoteException { public GetContainerStatusResponse getContainerStatus(
GetContainerStatusRequest request) throws YarnRemoteException {
ContainerId containerID = request.getContainerId(); ContainerId containerID = request.getContainerId();
// TODO: Only the container's owner can get containers' status today.
authorizeRequest(containerID, null);
LOG.info("Getting container-status for " + containerID); LOG.info("Getting container-status for " + containerID);
Container container = this.context.getContainers().get(containerID); Container container = this.context.getContainers().get(containerID);
if (container != null) { if (container != null) {
ContainerStatus containerStatus = container.cloneAndGetContainerStatus(); ContainerStatus containerStatus = container.cloneAndGetContainerStatus();
LOG.info("Returning " + containerStatus); LOG.info("Returning " + containerStatus);
GetContainerStatusResponse response = recordFactory.newRecordInstance(GetContainerStatusResponse.class); GetContainerStatusResponse response = recordFactory
.newRecordInstance(GetContainerStatusResponse.class);
response.setStatus(containerStatus); response.setStatus(containerStatus);
return response; return response;
} else {
throw RPCUtil.getRemoteException("Container " + containerID
+ " is not handled by this NodeManager");
} }
throw RPCUtil.getRemoteException("Container " + containerID
+ " is not handled by this NodeManager");
} }
class ContainerEventDispatcher implements EventHandler<ContainerEvent> { class ContainerEventDispatcher implements EventHandler<ContainerEvent> {
@ -412,8 +493,8 @@ public class ContainerManagerImpl extends CompositeService implements
} }
} }
@Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override
public void handle(ContainerManagerEvent event) { public void handle(ContainerManagerEvent event) {
switch (event.getType()) { switch (event.getType()) {
case FINISH_APPS: case FINISH_APPS:

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sec
import java.util.Collection; import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
@ -28,23 +30,23 @@ import org.apache.hadoop.security.token.TokenSelector;
public class LocalizerTokenSelector implements public class LocalizerTokenSelector implements
TokenSelector<LocalizerTokenIdentifier> { TokenSelector<LocalizerTokenIdentifier> {
private static final Log LOG = LogFactory
.getLog(LocalizerTokenSelector.class);
@SuppressWarnings("unchecked")
@Override @Override
public Token<LocalizerTokenIdentifier> selectToken(Text service, public Token<LocalizerTokenIdentifier> selectToken(Text service,
Collection<Token<? extends TokenIdentifier>> tokens) { Collection<Token<? extends TokenIdentifier>> tokens) {
System.err.print("=========== Using localizerTokenSelector");
// if (service == null) { LOG.debug("Using localizerTokenSelector.");
// return null;
// }
for (Token<? extends TokenIdentifier> token : tokens) { for (Token<? extends TokenIdentifier> token : tokens) {
System.err.print("============ token of kind " + token.getKind() + " is found"); LOG.debug("Token of kind " + token.getKind() + " is found");
if (LocalizerTokenIdentifier.KIND.equals(token.getKind()) if (LocalizerTokenIdentifier.KIND.equals(token.getKind())) {
//&& service.equals(token.getService())
) {
return (Token<LocalizerTokenIdentifier>) token; return (Token<LocalizerTokenIdentifier>) token;
} }
} }
System.err.print("returning null ========== "); LOG.debug("Returning null.");
return null; return null;
} }
} }

View File

@ -42,7 +42,6 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ContainerManager; import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
@ -101,9 +100,7 @@ public class AMLauncher implements Runnable {
private void connect() throws IOException { private void connect() throws IOException {
ContainerId masterContainerID = application.getMasterContainer().getId(); ContainerId masterContainerID = application.getMasterContainer().getId();
containerMgrProxy = containerMgrProxy = getContainerMgrProxy(masterContainerID);
getContainerMgrProxy(
masterContainerID.getApplicationAttemptId().getApplicationId());
} }
private void launch() throws IOException { private void launch() throws IOException {
@ -133,7 +130,7 @@ public class AMLauncher implements Runnable {
} }
protected ContainerManager getContainerMgrProxy( protected ContainerManager getContainerMgrProxy(
final ApplicationId applicationID) throws IOException { final ContainerId containerId) {
Container container = application.getMasterContainer(); Container container = application.getMasterContainer();
@ -141,8 +138,8 @@ public class AMLauncher implements Runnable {
final YarnRPC rpc = YarnRPC.create(conf); // TODO: Don't create again and again. final YarnRPC rpc = YarnRPC.create(conf); // TODO: Don't create again and again.
UserGroupInformation currentUser = UserGroupInformation currentUser = UserGroupInformation
UserGroupInformation.createRemoteUser("yarn"); // TODO .createRemoteUser(containerId.toString());
if (UserGroupInformation.isSecurityEnabled()) { if (UserGroupInformation.isSecurityEnabled()) {
ContainerToken containerToken = container.getContainerToken(); ContainerToken containerToken = container.getContainerToken();
Token<ContainerTokenIdentifier> token = Token<ContainerTokenIdentifier> token =

View File

@ -133,7 +133,7 @@ public class TestApplicationMasterLauncher {
getConfig()) { getConfig()) {
@Override @Override
protected ContainerManager getContainerMgrProxy( protected ContainerManager getContainerMgrProxy(
ApplicationId applicationID) throws IOException { ContainerId containerId) {
return containerManager; return containerManager;
} }
}; };

View File

@ -0,0 +1,501 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestContainerManagerSecurity {
static Log LOG = LogFactory.getLog(TestContainerManagerSecurity.class);
static final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
private static FileContext localFS = null;
private static final File localDir = new File("target",
TestContainerManagerSecurity.class.getName() + "-localDir")
.getAbsoluteFile();
private static MiniYARNCluster yarnCluster;
static final Configuration conf = new Configuration();
@BeforeClass
public static void setup() throws AccessControlException,
FileNotFoundException, UnsupportedFileSystemException, IOException {
localFS = FileContext.getLocalFSFileContext();
localFS.delete(new Path(localDir.getAbsolutePath()), true);
localDir.mkdir();
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
// Set AM expiry interval to be very long.
conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 100000L);
UserGroupInformation.setConfiguration(conf);
yarnCluster = new MiniYARNCluster(TestContainerManagerSecurity.class
.getName());
yarnCluster.init(conf);
yarnCluster.start();
}
@AfterClass
public static void teardown() {
yarnCluster.stop();
}
@Test
public void testAuthenticatedUser() throws IOException,
InterruptedException {
LOG.info("Running test for authenticated user");
ResourceManager resourceManager = yarnCluster.getResourceManager();
final YarnRPC yarnRPC = YarnRPC.create(conf);
// Submit an application
ApplicationId appID = resourceManager.getClientRMService()
.getNewApplication(Records.newRecord(GetNewApplicationRequest.class))
.getApplicationId();
AMRMProtocol scheduler = submitAndRegisterApplication(resourceManager,
yarnRPC, appID);
// Now request a container.
final Container allocatedContainer = requestAndGetContainer(scheduler,
appID);
// Now talk to the NM for launching the container.
final ContainerId containerID = allocatedContainer.getId();
UserGroupInformation authenticatedUser = UserGroupInformation
.createRemoteUser(containerID.toString());
ContainerToken containerToken = allocatedContainer.getContainerToken();
Token<ContainerTokenIdentifier> token = new Token<ContainerTokenIdentifier>(
containerToken.getIdentifier().array(), containerToken.getPassword()
.array(), new Text(containerToken.getKind()), new Text(
containerToken.getService()));
authenticatedUser.addToken(token);
authenticatedUser.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
ContainerManager client = (ContainerManager) yarnRPC.getProxy(
ContainerManager.class, NetUtils
.createSocketAddr(allocatedContainer.getNodeId().toString()),
conf);
LOG.info("Going to make a legal stopContainer() request");
StopContainerRequest request = recordFactory
.newRecordInstance(StopContainerRequest.class);
request.setContainerId(containerID);
client.stopContainer(request);
return null;
}
});
KillApplicationRequest request = Records
.newRecord(KillApplicationRequest.class);
request.setApplicationId(appID);
resourceManager.getClientRMService().forceKillApplication(request);
}
@Test
public void testMaliceUser() throws IOException, InterruptedException {
LOG.info("Running test for malice user");
ResourceManager resourceManager = yarnCluster.getResourceManager();
final YarnRPC yarnRPC = YarnRPC.create(conf);
// Submit an application
ApplicationId appID = resourceManager.getClientRMService()
.getNewApplication(Records.newRecord(GetNewApplicationRequest.class))
.getApplicationId();
AMRMProtocol scheduler = submitAndRegisterApplication(resourceManager,
yarnRPC, appID);
// Now request a container.
final Container allocatedContainer = requestAndGetContainer(scheduler,
appID);
// Now talk to the NM for launching the container with modified resource
final ContainerId containerID = allocatedContainer.getId();
UserGroupInformation maliceUser = UserGroupInformation
.createRemoteUser(containerID.toString());
ContainerToken containerToken = allocatedContainer.getContainerToken();
byte[] identifierBytes = containerToken.getIdentifier().array();
DataInputBuffer di = new DataInputBuffer();
di.reset(identifierBytes, identifierBytes.length);
ContainerTokenIdentifier dummyIdentifier = new ContainerTokenIdentifier();
dummyIdentifier.readFields(di);
// Malice user modifies the resource amount
Resource modifiedResource = BuilderUtils.newResource(2048);
ContainerTokenIdentifier modifiedIdentifier = new ContainerTokenIdentifier(
dummyIdentifier.getContainerID(), dummyIdentifier.getNmHostAddress(),
modifiedResource);
Token<ContainerTokenIdentifier> modifiedToken = new Token<ContainerTokenIdentifier>(
modifiedIdentifier.getBytes(), containerToken.getPassword().array(),
new Text(containerToken.getKind()), new Text(containerToken
.getService()));
maliceUser.addToken(modifiedToken);
maliceUser.doAs(new PrivilegedAction<Void>() {
@Override
public Void run() {
ContainerManager client = (ContainerManager) yarnRPC.getProxy(
ContainerManager.class, NetUtils
.createSocketAddr(allocatedContainer.getNodeId().toString()),
conf);
LOG.info("Going to contact NM: ilLegal request");
GetContainerStatusRequest request = recordFactory
.newRecordInstance(GetContainerStatusRequest.class);
request.setContainerId(containerID);
try {
client.getContainerStatus(request);
fail("Connection initiation with illegally modified "
+ "tokens is expected to fail.");
} catch (YarnRemoteException e) {
LOG.error("Got exception", e);
fail("Cannot get a YARN remote exception as "
+ "it will indicate RPC success");
} catch (Exception e) {
Assert.assertEquals(
java.lang.reflect.UndeclaredThrowableException.class
.getCanonicalName(), e.getClass().getCanonicalName());
Assert.assertEquals(
"DIGEST-MD5: digest response format violation. "
+ "Mismatched response.", e.getCause().getCause()
.getMessage());
}
return null;
}
});
KillApplicationRequest request = Records
.newRecord(KillApplicationRequest.class);
request.setApplicationId(appID);
resourceManager.getClientRMService().forceKillApplication(request);
}
@Test
public void testUnauthorizedUser() throws IOException, InterruptedException {
LOG.info("\n\nRunning test for malice user");
ResourceManager resourceManager = yarnCluster.getResourceManager();
final YarnRPC yarnRPC = YarnRPC.create(conf);
// Submit an application
final ApplicationId appID = resourceManager.getClientRMService()
.getNewApplication(Records.newRecord(GetNewApplicationRequest.class))
.getApplicationId();
AMRMProtocol scheduler = submitAndRegisterApplication(resourceManager,
yarnRPC, appID);
// Now request a container.
final Container allocatedContainer = requestAndGetContainer(scheduler,
appID);
// Now talk to the NM for launching the container with modified containerID
final ContainerId containerID = allocatedContainer.getId();
UserGroupInformation unauthorizedUser = UserGroupInformation
.createRemoteUser(containerID.toString());
ContainerToken containerToken = allocatedContainer.getContainerToken();
byte[] identifierBytes = containerToken.getIdentifier().array();
DataInputBuffer di = new DataInputBuffer();
di.reset(identifierBytes, identifierBytes.length);
final ContainerTokenIdentifier tokenId = new ContainerTokenIdentifier();
tokenId.readFields(di);
Token<ContainerTokenIdentifier> token = new Token<ContainerTokenIdentifier>(
identifierBytes, containerToken.getPassword().array(), new Text(
containerToken.getKind()), new Text(containerToken.getService()));
unauthorizedUser.addToken(token);
unauthorizedUser.doAs(new PrivilegedAction<Void>() {
@Override
public Void run() {
ContainerManager client = (ContainerManager) yarnRPC.getProxy(
ContainerManager.class, NetUtils
.createSocketAddr(allocatedContainer.getNodeId().toString()),
conf);
LOG.info("Going to contact NM: unauthorized request");
callWithIllegalContainerID(client, tokenId);
callWithIllegalResource(client, tokenId);
return null;
}
});
KillApplicationRequest request = Records
.newRecord(KillApplicationRequest.class);
request.setApplicationId(appID);
resourceManager.getClientRMService().forceKillApplication(request);
}
private AMRMProtocol submitAndRegisterApplication(
ResourceManager resourceManager, final YarnRPC yarnRPC,
ApplicationId appID) throws IOException,
UnsupportedFileSystemException, YarnRemoteException,
InterruptedException {
// TODO: Use a resource to work around bugs. Today NM doesn't create local
// app-dirs if there are no file to download!!
String fileName = "testFile-" + appID.toString();
File testFile = new File(localDir.getAbsolutePath(), fileName);
FileWriter tmpFile = new FileWriter(testFile);
tmpFile.write("testing");
tmpFile.close();
URL testFileURL = ConverterUtils.getYarnUrlFromPath(FileContext
.getFileContext().makeQualified(
new Path(localDir.getAbsolutePath(), fileName)));
LocalResource rsrc = BuilderUtils.newLocalResource(testFileURL,
LocalResourceType.FILE, LocalResourceVisibility.PRIVATE, testFile
.length(), testFile.lastModified());
ContainerLaunchContext amContainer = BuilderUtils
.newContainerLaunchContext(null, "testUser", BuilderUtils
.newResource(1024), Collections.singletonMap(fileName, rsrc),
new HashMap<String, String>(), Arrays.asList("sleep", "100"),
new HashMap<String, ByteBuffer>(), null,
new HashMap<ApplicationAccessType, String>());
ApplicationSubmissionContext appSubmissionContext = recordFactory
.newRecordInstance(ApplicationSubmissionContext.class);
appSubmissionContext.setApplicationId(appID);
appSubmissionContext.setUser("testUser");
appSubmissionContext.setAMContainerSpec(amContainer);
SubmitApplicationRequest submitRequest = recordFactory
.newRecordInstance(SubmitApplicationRequest.class);
submitRequest.setApplicationSubmissionContext(appSubmissionContext);
resourceManager.getClientRMService().submitApplication(submitRequest);
// Wait till container gets allocated for AM
int waitCounter = 0;
RMApp app = resourceManager.getRMContext().getRMApps().get(appID);
RMAppAttempt appAttempt = app == null ? null : app.getCurrentAppAttempt();
RMAppAttemptState state = appAttempt == null ? null : appAttempt
.getAppAttemptState();
while ((app == null || appAttempt == null || state == null || !state
.equals(RMAppAttemptState.LAUNCHED))
&& waitCounter++ != 20) {
LOG.info("Waiting for applicationAttempt to be created.. ");
Thread.sleep(1000);
app = resourceManager.getRMContext().getRMApps().get(appID);
appAttempt = app == null ? null : app.getCurrentAppAttempt();
state = appAttempt == null ? null : appAttempt.getAppAttemptState();
}
Assert.assertNotNull(app);
Assert.assertNotNull(appAttempt);
Assert.assertNotNull(state);
Assert.assertEquals(RMAppAttemptState.LAUNCHED, state);
UserGroupInformation currentUser = UserGroupInformation.createRemoteUser(
appAttempt.getAppAttemptId().toString());
// Ask for a container from the RM
String schedulerAddressString = conf.get(
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS);
final InetSocketAddress schedulerAddr = NetUtils
.createSocketAddr(schedulerAddressString);
ApplicationTokenIdentifier appTokenIdentifier = new ApplicationTokenIdentifier(
appAttempt.getAppAttemptId());
ApplicationTokenSecretManager appTokenSecretManager = new ApplicationTokenSecretManager();
appTokenSecretManager.setMasterKey(ApplicationTokenSecretManager
.createSecretKey("Dummy".getBytes())); // TODO: FIX. Be in Sync with
// ResourceManager.java
Token<ApplicationTokenIdentifier> appToken = new Token<ApplicationTokenIdentifier>(
appTokenIdentifier, appTokenSecretManager);
appToken.setService(new Text(schedulerAddressString));
currentUser.addToken(appToken);
AMRMProtocol scheduler = currentUser
.doAs(new PrivilegedAction<AMRMProtocol>() {
@Override
public AMRMProtocol run() {
return (AMRMProtocol) yarnRPC.getProxy(AMRMProtocol.class,
schedulerAddr, conf);
}
});
// Register the appMaster
RegisterApplicationMasterRequest request = recordFactory
.newRecordInstance(RegisterApplicationMasterRequest.class);
request.setApplicationAttemptId(resourceManager.getRMContext()
.getRMApps().get(appID).getCurrentAppAttempt().getAppAttemptId());
scheduler.registerApplicationMaster(request);
return scheduler;
}
private Container requestAndGetContainer(AMRMProtocol scheduler,
ApplicationId appID) throws YarnRemoteException, InterruptedException {
// Request a container allocation.
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ask.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), "*",
BuilderUtils.newResource(1024), 1));
AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
BuilderUtils.newApplicationAttemptId(appID, 1), 0, 0F, ask,
new ArrayList<ContainerId>());
List<Container> allocatedContainers = scheduler.allocate(allocateRequest)
.getAMResponse().getAllocatedContainers();
// Modify ask to request no more.
allocateRequest.clearAsks();
int waitCounter = 0;
while ((allocatedContainers == null || allocatedContainers.size() == 0)
&& waitCounter++ != 20) {
LOG.info("Waiting for container to be allocated..");
Thread.sleep(1000);
allocateRequest.setResponseId(allocateRequest.getResponseId() + 1);
allocatedContainers = scheduler.allocate(allocateRequest)
.getAMResponse().getAllocatedContainers();
}
Assert.assertNotNull("Container is not allocted!", allocatedContainers);
Assert.assertEquals("Didn't get one container!", 1, allocatedContainers
.size());
return allocatedContainers.get(0);
}
void callWithIllegalContainerID(ContainerManager client,
ContainerTokenIdentifier tokenId) {
GetContainerStatusRequest request = recordFactory
.newRecordInstance(GetContainerStatusRequest.class);
ContainerId newContainerId = BuilderUtils.newContainerId(BuilderUtils
.newApplicationAttemptId(tokenId.getContainerID()
.getApplicationAttemptId().getApplicationId(), 1), 42);
request.setContainerId(newContainerId); // Authenticated but
// unauthorized.
try {
client.getContainerStatus(request);
fail("Connection initiation with unauthorized "
+ "access is expected to fail.");
} catch (YarnRemoteException e) {
LOG.info("Got exception : ", e);
Assert.assertEquals("Unauthorized request to start container. "
+ "\nExpected containerId: " + tokenId.getContainerID()
+ " Found: " + newContainerId.toString(), e.getMessage());
}
}
void callWithIllegalResource(ContainerManager client,
ContainerTokenIdentifier tokenId) {
StartContainerRequest request = recordFactory
.newRecordInstance(StartContainerRequest.class);
// Authenticated but unauthorized, due to wrong resource
ContainerLaunchContext context = BuilderUtils.newContainerLaunchContext(
tokenId.getContainerID(), "testUser", BuilderUtils.newResource(2048),
new HashMap<String, LocalResource>(), new HashMap<String, String>(),
new ArrayList<String>(), new HashMap<String, ByteBuffer>(), null,
new HashMap<ApplicationAccessType, String>());
request.setContainerLaunchContext(context);
try {
client.startContainer(request);
fail("Connection initiation with unauthorized "
+ "access is expected to fail.");
} catch (YarnRemoteException e) {
LOG.info("Got exception : ", e);
Assert.assertTrue(e.getMessage().contains(
"Unauthorized request to start container. "));
Assert.assertTrue(e.getMessage().contains(
"\nExpected resource " + tokenId.getResource().toString()
+ " but found " + context.getResource().toString()));
}
}
}

View File

@ -1,365 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import junit.framework.Assert;
import org.apache.avro.AvroRuntimeException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
import org.apache.hadoop.yarn.security.ContainerManagerSecurityInfo;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.BeforeClass;
import org.junit.AfterClass;
import org.junit.Test;
public class TestContainerTokenSecretManager {
private static Log LOG = LogFactory
.getLog(TestContainerTokenSecretManager.class);
private static final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
private static FileContext localFS = null;
private static final File localDir = new File("target",
TestContainerTokenSecretManager.class.getName() + "-localDir")
.getAbsoluteFile();
private static MiniYARNCluster yarnCluster;
@BeforeClass
public static void setup() throws AccessControlException,
FileNotFoundException, UnsupportedFileSystemException, IOException {
localFS = FileContext.getLocalFSFileContext();
localFS.delete(new Path(localDir.getAbsolutePath()), true);
localDir.mkdir();
}
@AfterClass
public static void teardown() {
yarnCluster.stop();
}
@Test
public void test() throws IOException, InterruptedException {
final ApplicationId appID = recordFactory.newRecordInstance(ApplicationId.class);
appID.setClusterTimestamp(1234);
appID.setId(5);
final Configuration conf = new Configuration();
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
// Set AM expiry interval to be very long.
conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 100000L);
UserGroupInformation.setConfiguration(conf);
yarnCluster =
new MiniYARNCluster(TestContainerTokenSecretManager.class.getName());
yarnCluster.init(conf);
yarnCluster.start();
ResourceManager resourceManager = yarnCluster.getResourceManager();
final YarnRPC yarnRPC = YarnRPC.create(conf);
// Submit an application
ApplicationSubmissionContext appSubmissionContext =
recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
appSubmissionContext.setApplicationId(appID);
ContainerLaunchContext amContainer =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
amContainer.setResource(Resources.createResource(1024));
amContainer.setCommands(Arrays.asList("sleep", "100"));
appSubmissionContext.setUser("testUser");
// TODO: Use a resource to work around bugs. Today NM doesn't create local
// app-dirs if there are no file to download!!
File file = new File(localDir.getAbsolutePath(), "testFile");
FileWriter tmpFile = new FileWriter(file);
tmpFile.write("testing");
tmpFile.close();
URL testFileURL =
ConverterUtils.getYarnUrlFromPath(FileContext.getFileContext()
.makeQualified(new Path(localDir.getAbsolutePath(), "testFile")));
LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
rsrc.setResource(testFileURL);
rsrc.setSize(file.length());
rsrc.setTimestamp(file.lastModified());
rsrc.setType(LocalResourceType.FILE);
rsrc.setVisibility(LocalResourceVisibility.PRIVATE);
amContainer.setLocalResources(Collections.singletonMap("testFile", rsrc));
SubmitApplicationRequest submitRequest = recordFactory
.newRecordInstance(SubmitApplicationRequest.class);
submitRequest.setApplicationSubmissionContext(appSubmissionContext);
appSubmissionContext.setAMContainerSpec(amContainer);
resourceManager.getClientRMService().submitApplication(submitRequest);
// Wait till container gets allocated for AM
int waitCounter = 0;
RMApp app = resourceManager.getRMContext().getRMApps().get(appID);
RMAppAttempt appAttempt = app == null ? null : app.getCurrentAppAttempt();
RMAppAttemptState state = appAttempt == null ? null : appAttempt
.getAppAttemptState();
while ((app == null || appAttempt == null || state == null
|| !state.equals(RMAppAttemptState.LAUNCHED)) && waitCounter++ != 20) {
LOG.info("Waiting for applicationAttempt to be created.. ");
Thread.sleep(1000);
app = resourceManager.getRMContext().getRMApps().get(appID);
appAttempt = app == null ? null : app.getCurrentAppAttempt();
state = appAttempt == null ? null : appAttempt.getAppAttemptState();
}
Assert.assertNotNull(app);
Assert.assertNotNull(appAttempt);
Assert.assertNotNull(state);
Assert.assertEquals(RMAppAttemptState.LAUNCHED, state);
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
// Ask for a container from the RM
String schedulerAddressString =
conf.get(YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS);
final InetSocketAddress schedulerAddr =
NetUtils.createSocketAddr(schedulerAddressString);
ApplicationTokenIdentifier appTokenIdentifier = new ApplicationTokenIdentifier(
appAttempt.getAppAttemptId());
ApplicationTokenSecretManager appTokenSecretManager =
new ApplicationTokenSecretManager();
appTokenSecretManager.setMasterKey(ApplicationTokenSecretManager
.createSecretKey("Dummy".getBytes())); // TODO: FIX. Be in Sync with
// ResourceManager.java
Token<ApplicationTokenIdentifier> appToken =
new Token<ApplicationTokenIdentifier>(appTokenIdentifier,
appTokenSecretManager);
appToken.setService(new Text(schedulerAddressString));
currentUser.addToken(appToken);
AMRMProtocol scheduler =
currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
@Override
public AMRMProtocol run() {
return (AMRMProtocol) yarnRPC.getProxy(AMRMProtocol.class,
schedulerAddr, conf);
}
});
// Register the appMaster
RegisterApplicationMasterRequest request =
recordFactory
.newRecordInstance(RegisterApplicationMasterRequest.class);
request.setApplicationAttemptId(resourceManager.getRMContext()
.getRMApps().get(appID).getCurrentAppAttempt().getAppAttemptId());
scheduler.registerApplicationMaster(request);
// Now request a container allocation.
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ResourceRequest rr = recordFactory.newRecordInstance(ResourceRequest.class);
rr.setCapability(recordFactory.newRecordInstance(Resource.class));
rr.getCapability().setMemory(1024);
rr.setHostName("*");
rr.setNumContainers(1);
rr.setPriority(recordFactory.newRecordInstance(Priority.class));
rr.getPriority().setPriority(0);
ask.add(rr);
ArrayList<ContainerId> release = new ArrayList<ContainerId>();
AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
appAttempt.getAppAttemptId(), 0, 0F, ask, release);
List<Container> allocatedContainers = scheduler.allocate(allocateRequest)
.getAMResponse().getAllocatedContainers();
waitCounter = 0;
while ((allocatedContainers == null || allocatedContainers.size() == 0)
&& waitCounter++ != 20) {
LOG.info("Waiting for container to be allocated..");
Thread.sleep(1000);
allocateRequest.setResponseId(allocateRequest.getResponseId() + 1);
allocatedContainers =
scheduler.allocate(allocateRequest).getAMResponse()
.getAllocatedContainers();
}
Assert.assertNotNull("Container is not allocted!", allocatedContainers);
Assert.assertEquals("Didn't get one container!", 1,
allocatedContainers.size());
// Now talk to the NM for launching the container.
final Container allocatedContainer = allocatedContainers.get(0);
ContainerToken containerToken = allocatedContainer.getContainerToken();
Token<ContainerTokenIdentifier> token =
new Token<ContainerTokenIdentifier>(
containerToken.getIdentifier().array(),
containerToken.getPassword().array(), new Text(
containerToken.getKind()), new Text(
containerToken.getService()));
currentUser.addToken(token);
currentUser.doAs(new PrivilegedAction<Void>() {
@Override
public Void run() {
ContainerManager client = (ContainerManager) yarnRPC.getProxy(
ContainerManager.class, NetUtils
.createSocketAddr(allocatedContainer.getNodeId().toString()),
conf);
try {
LOG.info("Going to make a getContainerStatus() legal request");
GetContainerStatusRequest request =
recordFactory
.newRecordInstance(GetContainerStatusRequest.class);
ContainerId containerID =
recordFactory.newRecordInstance(ContainerId.class);
ApplicationAttemptId appAttemptId =
recordFactory.newRecordInstance(ApplicationAttemptId.class);
appAttemptId.setApplicationId(appID);
appAttemptId.setAttemptId(1);
appAttemptId.setApplicationId(appID);
containerID.setApplicationAttemptId(appAttemptId);
containerID.setId(1);
request.setContainerId(containerID);
client.getContainerStatus(request);
} catch (YarnRemoteException e) {
LOG.info("Error", e);
} catch (AvroRuntimeException e) {
LOG.info("Got the expected exception");
}
return null;
}
});
UserGroupInformation maliceUser =
UserGroupInformation.createRemoteUser(currentUser.getShortUserName());
byte[] identifierBytes = containerToken.getIdentifier().array();
DataInputBuffer di = new DataInputBuffer();
di.reset(identifierBytes, identifierBytes.length);
ContainerTokenIdentifier dummyIdentifier = new ContainerTokenIdentifier();
dummyIdentifier.readFields(di);
Resource modifiedResource = recordFactory.newRecordInstance(Resource.class);
modifiedResource.setMemory(2048);
ContainerTokenIdentifier modifiedIdentifier =
new ContainerTokenIdentifier(dummyIdentifier.getContainerID(),
dummyIdentifier.getNmHostName(), modifiedResource);
// Malice user modifies the resource amount
Token<ContainerTokenIdentifier> modifiedToken =
new Token<ContainerTokenIdentifier>(modifiedIdentifier.getBytes(),
containerToken.getPassword().array(), new Text(
containerToken.getKind()), new Text(
containerToken.getService()));
maliceUser.addToken(modifiedToken);
maliceUser.doAs(new PrivilegedAction<Void>() {
@Override
public Void run() {
ContainerManager client = (ContainerManager) yarnRPC.getProxy(
ContainerManager.class, NetUtils
.createSocketAddr(allocatedContainer.getNodeId().toString()),
conf);
ContainerId containerID;
LOG.info("Going to contact NM: ilLegal request");
GetContainerStatusRequest request =
recordFactory
.newRecordInstance(GetContainerStatusRequest.class);
containerID =
recordFactory.newRecordInstance(ContainerId.class);
ApplicationAttemptId appAttemptId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
appAttemptId.setApplicationId(appID);
appAttemptId.setAttemptId(1);
appAttemptId.setApplicationId(appID);
containerID.setApplicationAttemptId(appAttemptId);
containerID.setId(1);
request.setContainerId(containerID);
try {
client.getContainerStatus(request);
fail("Connection initiation with illegally modified "
+ "tokens is expected to fail.");
} catch (YarnRemoteException e) {
LOG.error("Got exception", e);
fail("Cannot get a YARN remote exception as " +
"it will indicate RPC success");
} catch (Exception e) {
Assert.assertEquals(
java.lang.reflect.UndeclaredThrowableException.class
.getCanonicalName(), e.getClass().getCanonicalName());
Assert
.assertEquals(
"DIGEST-MD5: digest response format violation. Mismatched response.",
e.getCause().getCause().getMessage());
}
return null;
}
});
}
}

View File

@ -12,7 +12,7 @@
# log4j configuration used during build and unit tests # log4j configuration used during build and unit tests
log4j.rootLogger=info,stdout log4j.rootLogger=INFO,stdout
log4j.threshhold=ALL log4j.threshhold=ALL
log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout=org.apache.log4j.PatternLayout