MAPREDUCE-3940. ContainerTokens should have an expiry interval. Contributed by Siddharth Seth and Vinod Kumar Vavilapalli.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1359910 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3b5ea87502
commit
3bfb26ad3b
|
@ -670,6 +670,9 @@ Release 0.23.3 - UNRELEASED
|
||||||
MAPREDUCE-4252. MR2 job never completes with 1 pending task (Tom White via
|
MAPREDUCE-4252. MR2 job never completes with 1 pending task (Tom White via
|
||||||
bobby)
|
bobby)
|
||||||
|
|
||||||
|
MAPREDUCE-3940. ContainerTokens should have an expiry interval. (Siddharth
|
||||||
|
Seth and Vinod Kumar Vavilapalli via vinodkv)
|
||||||
|
|
||||||
Release 0.23.2 - UNRELEASED
|
Release 0.23.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -29,6 +29,7 @@ import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -86,11 +87,13 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -352,7 +355,7 @@ public class TestRMContainerAllocator {
|
||||||
}
|
}
|
||||||
@Override
|
@Override
|
||||||
protected ResourceScheduler createScheduler() {
|
protected ResourceScheduler createScheduler() {
|
||||||
return new MyFifoScheduler();
|
return new MyFifoScheduler(this.getRMContext());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1091,6 +1094,19 @@ public class TestRMContainerAllocator {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class MyFifoScheduler extends FifoScheduler {
|
private static class MyFifoScheduler extends FifoScheduler {
|
||||||
|
|
||||||
|
public MyFifoScheduler(RMContext rmContext) {
|
||||||
|
super();
|
||||||
|
try {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
reinitialize(conf, new ContainerTokenSecretManager(conf),
|
||||||
|
rmContext);
|
||||||
|
} catch (IOException ie) {
|
||||||
|
LOG.info("add application failed with ", ie);
|
||||||
|
assert (false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// override this to copy the objects otherwise FifoScheduler updates the
|
// override this to copy the objects otherwise FifoScheduler updates the
|
||||||
// numContainers in same objects as kept by RMContainerAllocator
|
// numContainers in same objects as kept by RMContainerAllocator
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -50,11 +50,11 @@ public interface ContainerToken extends DelegationToken {
|
||||||
*/
|
*/
|
||||||
@Public
|
@Public
|
||||||
@Stable
|
@Stable
|
||||||
public abstract ByteBuffer getIdentifier();
|
ByteBuffer getIdentifier();
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@Stable
|
@Stable
|
||||||
public abstract void setIdentifier(ByteBuffer identifier);
|
void setIdentifier(ByteBuffer identifier);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the token password
|
* Get the token password
|
||||||
|
@ -62,11 +62,11 @@ public interface ContainerToken extends DelegationToken {
|
||||||
*/
|
*/
|
||||||
@Public
|
@Public
|
||||||
@Stable
|
@Stable
|
||||||
public abstract ByteBuffer getPassword();
|
ByteBuffer getPassword();
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@Stable
|
@Stable
|
||||||
public abstract void setPassword(ByteBuffer password);
|
void setPassword(ByteBuffer password);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the token kind.
|
* Get the token kind.
|
||||||
|
@ -74,11 +74,11 @@ public interface ContainerToken extends DelegationToken {
|
||||||
*/
|
*/
|
||||||
@Public
|
@Public
|
||||||
@Stable
|
@Stable
|
||||||
public abstract String getKind();
|
String getKind();
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@Stable
|
@Stable
|
||||||
public abstract void setKind(String kind);
|
void setKind(String kind);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the service to which the token is allocated.
|
* Get the service to which the token is allocated.
|
||||||
|
@ -86,10 +86,10 @@ public interface ContainerToken extends DelegationToken {
|
||||||
*/
|
*/
|
||||||
@Public
|
@Public
|
||||||
@Stable
|
@Stable
|
||||||
public abstract String getService();
|
String getService();
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@Stable
|
@Stable
|
||||||
public abstract void setService(String service);
|
void setService(String service);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.api.impl.pb.client;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.io.Closeable;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||||
|
@ -49,7 +50,8 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainerRequestProto;
|
||||||
|
|
||||||
import com.google.protobuf.ServiceException;
|
import com.google.protobuf.ServiceException;
|
||||||
|
|
||||||
public class ContainerManagerPBClientImpl implements ContainerManager {
|
public class ContainerManagerPBClientImpl implements ContainerManager,
|
||||||
|
Closeable {
|
||||||
|
|
||||||
// Not a documented config. Only used for tests
|
// Not a documented config. Only used for tests
|
||||||
static final String NM_COMMAND_TIMEOUT = YarnConfiguration.YARN_PREFIX
|
static final String NM_COMMAND_TIMEOUT = YarnConfiguration.YARN_PREFIX
|
||||||
|
|
|
@ -35,6 +35,11 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* TokenIdentifier for a container. Encodes {@link ContainerId},
|
||||||
|
* {@link Resource} needed by the container and the target NMs host-address.
|
||||||
|
*
|
||||||
|
*/
|
||||||
public class ContainerTokenIdentifier extends TokenIdentifier {
|
public class ContainerTokenIdentifier extends TokenIdentifier {
|
||||||
|
|
||||||
private static Log LOG = LogFactory.getLog(ContainerTokenIdentifier.class);
|
private static Log LOG = LogFactory.getLog(ContainerTokenIdentifier.class);
|
||||||
|
@ -44,14 +49,19 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
|
||||||
private ContainerId containerId;
|
private ContainerId containerId;
|
||||||
private String nmHostAddr;
|
private String nmHostAddr;
|
||||||
private Resource resource;
|
private Resource resource;
|
||||||
|
private long expiryTimeStamp;
|
||||||
|
|
||||||
public ContainerTokenIdentifier(ContainerId containerID, String hostName,
|
public ContainerTokenIdentifier(ContainerId containerID, String hostName,
|
||||||
Resource r) {
|
Resource r, long expiryTimeStamp) {
|
||||||
this.containerId = containerID;
|
this.containerId = containerID;
|
||||||
this.nmHostAddr = hostName;
|
this.nmHostAddr = hostName;
|
||||||
this.resource = r;
|
this.resource = r;
|
||||||
|
this.expiryTimeStamp = expiryTimeStamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Default constructor needed by RPC layer/SecretManager.
|
||||||
|
*/
|
||||||
public ContainerTokenIdentifier() {
|
public ContainerTokenIdentifier() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -67,6 +77,10 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
|
||||||
return this.resource;
|
return this.resource;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getExpiryTimeStamp() {
|
||||||
|
return this.expiryTimeStamp;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void write(DataOutput out) throws IOException {
|
public void write(DataOutput out) throws IOException {
|
||||||
LOG.debug("Writing ContainerTokenIdentifier to RPC layer: " + this);
|
LOG.debug("Writing ContainerTokenIdentifier to RPC layer: " + this);
|
||||||
|
@ -79,6 +93,7 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
|
||||||
out.writeInt(this.containerId.getId());
|
out.writeInt(this.containerId.getId());
|
||||||
out.writeUTF(this.nmHostAddr);
|
out.writeUTF(this.nmHostAddr);
|
||||||
out.writeInt(this.resource.getMemory());
|
out.writeInt(this.resource.getMemory());
|
||||||
|
out.writeLong(this.expiryTimeStamp);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -91,6 +106,7 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
|
||||||
.readInt());
|
.readInt());
|
||||||
this.nmHostAddr = in.readUTF();
|
this.nmHostAddr = in.readUTF();
|
||||||
this.resource = BuilderUtils.newResource(in.readInt());
|
this.resource = BuilderUtils.newResource(in.readInt());
|
||||||
|
this.expiryTimeStamp = in.readLong();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -103,6 +119,7 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
|
||||||
return UserGroupInformation.createRemoteUser(this.containerId.toString());
|
return UserGroupInformation.createRemoteUser(this.containerId.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: Needed?
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public static class Renewer extends Token.TrivialRenewer {
|
public static class Renewer extends Token.TrivialRenewer {
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.security;
|
package org.apache.hadoop.yarn.server.security;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
@ -25,9 +26,21 @@ import javax.crypto.SecretKey;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.security.token.SecretManager;
|
import org.apache.hadoop.security.token.SecretManager;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* SecretManager for ContainerTokens. Used by both RM and NM and hence is
|
||||||
|
* present in yarn-server-common package.
|
||||||
|
*
|
||||||
|
*/
|
||||||
public class ContainerTokenSecretManager extends
|
public class ContainerTokenSecretManager extends
|
||||||
SecretManager<ContainerTokenIdentifier> {
|
SecretManager<ContainerTokenIdentifier> {
|
||||||
|
|
||||||
|
@ -37,6 +50,33 @@ public class ContainerTokenSecretManager extends
|
||||||
Map<String, SecretKey> secretkeys =
|
Map<String, SecretKey> secretkeys =
|
||||||
new ConcurrentHashMap<String, SecretKey>();
|
new ConcurrentHashMap<String, SecretKey>();
|
||||||
|
|
||||||
|
private final long containerTokenExpiryInterval;
|
||||||
|
|
||||||
|
public ContainerTokenSecretManager(Configuration conf) {
|
||||||
|
this.containerTokenExpiryInterval =
|
||||||
|
conf.getInt(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ContainerToken createContainerToken(ContainerId containerId,
|
||||||
|
NodeId nodeId, Resource capability) {
|
||||||
|
try {
|
||||||
|
long expiryTimeStamp =
|
||||||
|
System.currentTimeMillis() + containerTokenExpiryInterval;
|
||||||
|
ContainerTokenIdentifier tokenIdentifier =
|
||||||
|
new ContainerTokenIdentifier(containerId, nodeId.toString(),
|
||||||
|
capability, expiryTimeStamp);
|
||||||
|
return BuilderUtils.newContainerToken(nodeId,
|
||||||
|
ByteBuffer.wrap(this.createPassword(tokenIdentifier)), tokenIdentifier);
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
// this could be because DNS is down - in which case we just want
|
||||||
|
// to retry and not bring RM down. Caller should note and act on the fact
|
||||||
|
// that container is not creatable.
|
||||||
|
LOG.error("Error trying to create new container", e);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Used by master for generation of secretyKey per host
|
// Used by master for generation of secretyKey per host
|
||||||
public SecretKey createAndGetSecretKey(CharSequence hostName) {
|
public SecretKey createAndGetSecretKey(CharSequence hostName) {
|
||||||
String hostNameStr = hostName.toString();
|
String hostNameStr = hostName.toString();
|
||||||
|
|
|
@ -116,7 +116,7 @@ public class NodeManager extends CompositeService implements
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
LOG.info("Security is enabled on NodeManager. "
|
LOG.info("Security is enabled on NodeManager. "
|
||||||
+ "Creating ContainerTokenSecretManager");
|
+ "Creating ContainerTokenSecretManager");
|
||||||
this.containerTokenSecretManager = new ContainerTokenSecretManager();
|
this.containerTokenSecretManager = new ContainerTokenSecretManager(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.aclsManager = new ApplicationACLsManager(conf);
|
this.aclsManager = new ApplicationACLsManager(conf);
|
||||||
|
|
|
@ -324,6 +324,15 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
+ containerIDStr);
|
+ containerIDStr);
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
|
// Ensure the token is not expired.
|
||||||
|
// Token expiry is not checked for stopContainer/getContainerStatus
|
||||||
|
if (tokenId.getExpiryTimeStamp() < System.currentTimeMillis()) {
|
||||||
|
unauthorized = true;
|
||||||
|
messageBuilder.append("\nThis token is expired. current time is "
|
||||||
|
+ System.currentTimeMillis() + " found "
|
||||||
|
+ tokenId.getExpiryTimeStamp());
|
||||||
|
}
|
||||||
|
|
||||||
Resource resource = tokenId.getResource();
|
Resource resource = tokenId.getResource();
|
||||||
if (!resource.equals(launchContext.getResource())) {
|
if (!resource.equals(launchContext.getResource())) {
|
||||||
unauthorized = true;
|
unauthorized = true;
|
||||||
|
|
|
@ -86,7 +86,8 @@ public class TestEventFlow {
|
||||||
healthChecker.init(conf);
|
healthChecker.init(conf);
|
||||||
LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler();
|
LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler();
|
||||||
NodeManagerMetrics metrics = NodeManagerMetrics.create();
|
NodeManagerMetrics metrics = NodeManagerMetrics.create();
|
||||||
ContainerTokenSecretManager containerTokenSecretManager = new ContainerTokenSecretManager();
|
ContainerTokenSecretManager containerTokenSecretManager =
|
||||||
|
new ContainerTokenSecretManager(conf);
|
||||||
NodeStatusUpdater nodeStatusUpdater =
|
NodeStatusUpdater nodeStatusUpdater =
|
||||||
new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, metrics, containerTokenSecretManager) {
|
new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, metrics, containerTokenSecretManager) {
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -70,7 +70,8 @@ public abstract class BaseContainerManagerTest {
|
||||||
protected static File localLogDir;
|
protected static File localLogDir;
|
||||||
protected static File remoteLogDir;
|
protected static File remoteLogDir;
|
||||||
protected static File tmpDir;
|
protected static File tmpDir;
|
||||||
protected ContainerTokenSecretManager containerTokenSecretManager = new ContainerTokenSecretManager();
|
protected ContainerTokenSecretManager containerTokenSecretManager =
|
||||||
|
new ContainerTokenSecretManager(new Configuration());
|
||||||
|
|
||||||
protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
|
protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
|
||||||
|
|
||||||
|
|
|
@ -385,7 +385,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
||||||
delSrvc.init(conf);
|
delSrvc.init(conf);
|
||||||
|
|
||||||
ContainerTokenSecretManager containerTokenSecretManager = new
|
ContainerTokenSecretManager containerTokenSecretManager = new
|
||||||
ContainerTokenSecretManager();
|
ContainerTokenSecretManager(conf);
|
||||||
containerManager = new ContainerManagerImpl(context, exec, delSrvc,
|
containerManager = new ContainerManagerImpl(context, exec, delSrvc,
|
||||||
nodeStatusUpdater, metrics, containerTokenSecretManager,
|
nodeStatusUpdater, metrics, containerTokenSecretManager,
|
||||||
new ApplicationACLsManager(conf), dirsHandler);
|
new ApplicationACLsManager(conf), dirsHandler);
|
||||||
|
|
|
@ -100,8 +100,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
protected ClientToAMSecretManager clientToAMSecretManager =
|
protected ClientToAMSecretManager clientToAMSecretManager =
|
||||||
new ClientToAMSecretManager();
|
new ClientToAMSecretManager();
|
||||||
|
|
||||||
protected ContainerTokenSecretManager containerTokenSecretManager =
|
protected ContainerTokenSecretManager containerTokenSecretManager;
|
||||||
new ContainerTokenSecretManager();
|
|
||||||
|
|
||||||
protected ApplicationTokenSecretManager appTokenSecretManager;
|
protected ApplicationTokenSecretManager appTokenSecretManager;
|
||||||
|
|
||||||
|
@ -151,6 +150,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
this.rmDispatcher);
|
this.rmDispatcher);
|
||||||
addService(this.containerAllocationExpirer);
|
addService(this.containerAllocationExpirer);
|
||||||
|
|
||||||
|
this.containerTokenSecretManager = new ContainerTokenSecretManager(conf);
|
||||||
|
|
||||||
AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor();
|
AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor();
|
||||||
addService(amLivelinessMonitor);
|
addService(amLivelinessMonitor);
|
||||||
|
|
||||||
|
@ -611,6 +612,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
return this.applicationACLsManager;
|
return this.applicationACLsManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public ContainerTokenSecretManager getContainerTokenSecretManager() {
|
||||||
|
return this.containerTokenSecretManager;
|
||||||
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public ApplicationTokenSecretManager getApplicationTokenSecretManager(){
|
public ApplicationTokenSecretManager getApplicationTokenSecretManager(){
|
||||||
return this.appTokenSecretManager;
|
return this.appTokenSecretManager;
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
|
import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
|
||||||
|
|
||||||
|
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||||
public class ContainerAllocationExpirer extends
|
public class ContainerAllocationExpirer extends
|
||||||
AbstractLivelinessMonitor<ContainerId> {
|
AbstractLivelinessMonitor<ContainerId> {
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
|
@ -54,7 +53,6 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
||||||
|
@ -1178,17 +1176,11 @@ public class LeafQueue implements CSQueue {
|
||||||
|
|
||||||
// If security is enabled, send the container-tokens too.
|
// If security is enabled, send the container-tokens too.
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
ContainerTokenIdentifier tokenIdentifier = new ContainerTokenIdentifier(
|
containerToken =
|
||||||
containerId, nodeId.toString(), capability);
|
containerTokenSecretManager.createContainerToken(containerId, nodeId,
|
||||||
try {
|
capability);
|
||||||
containerToken = BuilderUtils.newContainerToken(nodeId, ByteBuffer
|
if (containerToken == null) {
|
||||||
.wrap(containerTokenSecretManager
|
return null; // Try again later.
|
||||||
.createPassword(tokenIdentifier)), tokenIdentifier);
|
|
||||||
} catch (IllegalArgumentException e) {
|
|
||||||
// this could be because DNS is down - in which case we just want
|
|
||||||
// to retry and not bring RM down
|
|
||||||
LOG.error("Error trying to create new container", e);
|
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -32,7 +31,6 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||||
import org.apache.hadoop.conf.Configurable;
|
import org.apache.hadoop.conf.Configurable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -55,7 +53,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
@ -541,11 +538,12 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
|
||||||
|
|
||||||
// If security is enabled, send the container-tokens too.
|
// If security is enabled, send the container-tokens too.
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
ContainerTokenIdentifier tokenIdentifier = new ContainerTokenIdentifier(
|
containerToken =
|
||||||
containerId, nodeId.toString(), capability);
|
containerTokenSecretManager.createContainerToken(containerId,
|
||||||
containerToken = BuilderUtils.newContainerToken(nodeId, ByteBuffer
|
nodeId, capability);
|
||||||
.wrap(containerTokenSecretManager
|
if (containerToken == null) {
|
||||||
.createPassword(tokenIdentifier)), tokenIdentifier);
|
return i; // Try again later.
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create the container
|
// Create the container
|
||||||
|
@ -562,11 +560,11 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
|
||||||
// Inform the node
|
// Inform the node
|
||||||
node.allocateContainer(application.getApplicationId(),
|
node.allocateContainer(application.getApplicationId(),
|
||||||
rmContainer);
|
rmContainer);
|
||||||
|
|
||||||
|
// Update usage for this container
|
||||||
|
Resources.addTo(usedResource, capability);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update total usage
|
|
||||||
Resources.addTo(usedResource,
|
|
||||||
Resources.multiply(capability, assignedContainers));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return assignedContainers;
|
return assignedContainers;
|
||||||
|
|
|
@ -52,8 +52,6 @@ public class TestNMExpiry {
|
||||||
private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
||||||
|
|
||||||
ResourceTrackerService resourceTrackerService;
|
ResourceTrackerService resourceTrackerService;
|
||||||
ContainerTokenSecretManager containerTokenSecretManager =
|
|
||||||
new ContainerTokenSecretManager();
|
|
||||||
|
|
||||||
private class TestNmLivelinessMonitor extends NMLivelinessMonitor {
|
private class TestNmLivelinessMonitor extends NMLivelinessMonitor {
|
||||||
public TestNmLivelinessMonitor(Dispatcher dispatcher) {
|
public TestNmLivelinessMonitor(Dispatcher dispatcher) {
|
||||||
|
@ -84,6 +82,8 @@ public class TestNMExpiry {
|
||||||
nmLivelinessMonitor.start();
|
nmLivelinessMonitor.start();
|
||||||
NodesListManager nodesListManager = new NodesListManager(context);
|
NodesListManager nodesListManager = new NodesListManager(context);
|
||||||
nodesListManager.init(conf);
|
nodesListManager.init(conf);
|
||||||
|
ContainerTokenSecretManager containerTokenSecretManager =
|
||||||
|
new ContainerTokenSecretManager(conf);
|
||||||
resourceTrackerService = new ResourceTrackerService(context,
|
resourceTrackerService = new ResourceTrackerService(context,
|
||||||
nodesListManager, nmLivelinessMonitor, containerTokenSecretManager);
|
nodesListManager, nmLivelinessMonitor, containerTokenSecretManager);
|
||||||
|
|
||||||
|
|
|
@ -52,8 +52,6 @@ import org.junit.Test;
|
||||||
public class TestRMNMRPCResponseId {
|
public class TestRMNMRPCResponseId {
|
||||||
private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
||||||
ResourceTrackerService resourceTrackerService;
|
ResourceTrackerService resourceTrackerService;
|
||||||
ContainerTokenSecretManager containerTokenSecretManager =
|
|
||||||
new ContainerTokenSecretManager();
|
|
||||||
private NodeId nodeId;
|
private NodeId nodeId;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
|
@ -73,6 +71,8 @@ public class TestRMNMRPCResponseId {
|
||||||
NodesListManager nodesListManager = new NodesListManager(context);
|
NodesListManager nodesListManager = new NodesListManager(context);
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
nodesListManager.init(conf);
|
nodesListManager.init(conf);
|
||||||
|
ContainerTokenSecretManager containerTokenSecretManager =
|
||||||
|
new ContainerTokenSecretManager(conf);
|
||||||
resourceTrackerService = new ResourceTrackerService(context,
|
resourceTrackerService = new ResourceTrackerService(context,
|
||||||
nodesListManager, new NMLivelinessMonitor(dispatcher),
|
nodesListManager, new NMLivelinessMonitor(dispatcher),
|
||||||
containerTokenSecretManager);
|
containerTokenSecretManager);
|
||||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||||
import org.apache.hadoop.io.DataInputBuffer;
|
import org.apache.hadoop.io.DataInputBuffer;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
|
@ -214,11 +215,12 @@ public class TestContainerManagerSecurity {
|
||||||
|
|
||||||
ContainerTokenIdentifier dummyIdentifier = new ContainerTokenIdentifier();
|
ContainerTokenIdentifier dummyIdentifier = new ContainerTokenIdentifier();
|
||||||
dummyIdentifier.readFields(di);
|
dummyIdentifier.readFields(di);
|
||||||
|
|
||||||
// Malice user modifies the resource amount
|
// Malice user modifies the resource amount
|
||||||
Resource modifiedResource = BuilderUtils.newResource(2048);
|
Resource modifiedResource = BuilderUtils.newResource(2048);
|
||||||
ContainerTokenIdentifier modifiedIdentifier = new ContainerTokenIdentifier(
|
ContainerTokenIdentifier modifiedIdentifier = new ContainerTokenIdentifier(
|
||||||
dummyIdentifier.getContainerID(), dummyIdentifier.getNmHostAddress(),
|
dummyIdentifier.getContainerID(), dummyIdentifier.getNmHostAddress(),
|
||||||
modifiedResource);
|
modifiedResource, Long.MAX_VALUE);
|
||||||
Token<ContainerTokenIdentifier> modifiedToken = new Token<ContainerTokenIdentifier>(
|
Token<ContainerTokenIdentifier> modifiedToken = new Token<ContainerTokenIdentifier>(
|
||||||
modifiedIdentifier.getBytes(), containerToken.getPassword().array(),
|
modifiedIdentifier.getBytes(), containerToken.getPassword().array(),
|
||||||
new Text(containerToken.getKind()), new Text(containerToken
|
new Text(containerToken.getKind()), new Text(containerToken
|
||||||
|
@ -288,6 +290,7 @@ public class TestContainerManagerSecurity {
|
||||||
// Now talk to the NM for launching the container with modified containerID
|
// Now talk to the NM for launching the container with modified containerID
|
||||||
final ContainerId containerID = allocatedContainer.getId();
|
final ContainerId containerID = allocatedContainer.getId();
|
||||||
|
|
||||||
|
/////////// Test calls with illegal containerIDs and illegal Resources
|
||||||
UserGroupInformation unauthorizedUser = UserGroupInformation
|
UserGroupInformation unauthorizedUser = UserGroupInformation
|
||||||
.createRemoteUser(containerID.toString());
|
.createRemoteUser(containerID.toString());
|
||||||
ContainerToken containerToken = allocatedContainer.getContainerToken();
|
ContainerToken containerToken = allocatedContainer.getContainerToken();
|
||||||
|
@ -303,9 +306,10 @@ public class TestContainerManagerSecurity {
|
||||||
containerToken.getKind()), new Text(containerToken.getService()));
|
containerToken.getKind()), new Text(containerToken.getService()));
|
||||||
|
|
||||||
unauthorizedUser.addToken(token);
|
unauthorizedUser.addToken(token);
|
||||||
unauthorizedUser.doAs(new PrivilegedAction<Void>() {
|
ContainerManager client =
|
||||||
|
unauthorizedUser.doAs(new PrivilegedAction<ContainerManager>() {
|
||||||
@Override
|
@Override
|
||||||
public Void run() {
|
public ContainerManager run() {
|
||||||
ContainerManager client = (ContainerManager) yarnRPC.getProxy(
|
ContainerManager client = (ContainerManager) yarnRPC.getProxy(
|
||||||
ContainerManager.class, NetUtils
|
ContainerManager.class, NetUtils
|
||||||
.createSocketAddr(allocatedContainer.getNodeId().toString()),
|
.createSocketAddr(allocatedContainer.getNodeId().toString()),
|
||||||
|
@ -316,9 +320,69 @@ public class TestContainerManagerSecurity {
|
||||||
callWithIllegalContainerID(client, tokenId);
|
callWithIllegalContainerID(client, tokenId);
|
||||||
callWithIllegalResource(client, tokenId);
|
callWithIllegalResource(client, tokenId);
|
||||||
|
|
||||||
|
return client;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
/////////// End of testing for illegal containerIDs and illegal Resources
|
||||||
|
|
||||||
|
/////////// Test calls with expired tokens
|
||||||
|
RPC.stopProxy(client);
|
||||||
|
unauthorizedUser = UserGroupInformation
|
||||||
|
.createRemoteUser(containerID.toString());
|
||||||
|
|
||||||
|
final ContainerTokenIdentifier newTokenId =
|
||||||
|
new ContainerTokenIdentifier(tokenId.getContainerID(),
|
||||||
|
tokenId.getNmHostAddress(), tokenId.getResource(),
|
||||||
|
System.currentTimeMillis() - 1);
|
||||||
|
byte[] passowrd =
|
||||||
|
resourceManager.getContainerTokenSecretManager().createPassword(
|
||||||
|
newTokenId);
|
||||||
|
// Create a valid token by using the key from the RM.
|
||||||
|
token = new Token<ContainerTokenIdentifier>(
|
||||||
|
newTokenId.getBytes(), passowrd, new Text(
|
||||||
|
containerToken.getKind()), new Text(containerToken.getService()));
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
unauthorizedUser.addToken(token);
|
||||||
|
unauthorizedUser.doAs(new PrivilegedAction<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void run() {
|
||||||
|
ContainerManager client = (ContainerManager) yarnRPC.getProxy(
|
||||||
|
ContainerManager.class, NetUtils
|
||||||
|
.createSocketAddr(allocatedContainer.getNodeId().toString()),
|
||||||
|
conf);
|
||||||
|
|
||||||
|
LOG.info("Going to contact NM with expired token");
|
||||||
|
ContainerLaunchContext context = createContainerLaunchContextForTest(newTokenId);
|
||||||
|
StartContainerRequest request = Records.newRecord(StartContainerRequest.class);
|
||||||
|
request.setContainerLaunchContext(context);
|
||||||
|
|
||||||
|
//Calling startContainer with an expired token.
|
||||||
|
try {
|
||||||
|
client.startContainer(request);
|
||||||
|
fail("Connection initiation with expired "
|
||||||
|
+ "token is expected to fail.");
|
||||||
|
} catch (Throwable t) {
|
||||||
|
LOG.info("Got exception : ", t);
|
||||||
|
Assert.assertTrue(t.getMessage().contains(
|
||||||
|
"This token is expired. current time is"));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try stopping a container - should not get an expiry error.
|
||||||
|
StopContainerRequest stopRequest = Records.newRecord(StopContainerRequest.class);
|
||||||
|
stopRequest.setContainerId(newTokenId.getContainerID());
|
||||||
|
try {
|
||||||
|
client.stopContainer(stopRequest);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
fail("Stop Container call should have succeeded");
|
||||||
|
}
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
/////////// End of testing calls with expired tokens
|
||||||
|
|
||||||
KillApplicationRequest request = Records
|
KillApplicationRequest request = Records
|
||||||
.newRecord(KillApplicationRequest.class);
|
.newRecord(KillApplicationRequest.class);
|
||||||
|
@ -481,11 +545,9 @@ public class TestContainerManagerSecurity {
|
||||||
StartContainerRequest request = recordFactory
|
StartContainerRequest request = recordFactory
|
||||||
.newRecordInstance(StartContainerRequest.class);
|
.newRecordInstance(StartContainerRequest.class);
|
||||||
// Authenticated but unauthorized, due to wrong resource
|
// Authenticated but unauthorized, due to wrong resource
|
||||||
ContainerLaunchContext context = BuilderUtils.newContainerLaunchContext(
|
ContainerLaunchContext context =
|
||||||
tokenId.getContainerID(), "testUser", BuilderUtils.newResource(2048),
|
createContainerLaunchContextForTest(tokenId);
|
||||||
new HashMap<String, LocalResource>(), new HashMap<String, String>(),
|
context.getResource().setMemory(2048); // Set a different resource size.
|
||||||
new ArrayList<String>(), new HashMap<String, ByteBuffer>(), null,
|
|
||||||
new HashMap<ApplicationAccessType, String>());
|
|
||||||
request.setContainerLaunchContext(context);
|
request.setContainerLaunchContext(context);
|
||||||
try {
|
try {
|
||||||
client.startContainer(request);
|
client.startContainer(request);
|
||||||
|
@ -500,4 +562,17 @@ public class TestContainerManagerSecurity {
|
||||||
+ " but found " + context.getResource().toString()));
|
+ " but found " + context.getResource().toString()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ContainerLaunchContext createContainerLaunchContextForTest(
|
||||||
|
ContainerTokenIdentifier tokenId) {
|
||||||
|
ContainerLaunchContext context =
|
||||||
|
BuilderUtils.newContainerLaunchContext(tokenId.getContainerID(),
|
||||||
|
"testUser",
|
||||||
|
BuilderUtils.newResource(tokenId.getResource().getMemory()),
|
||||||
|
new HashMap<String, LocalResource>(),
|
||||||
|
new HashMap<String, String>(), new ArrayList<String>(),
|
||||||
|
new HashMap<String, ByteBuffer>(), null,
|
||||||
|
new HashMap<ApplicationAccessType, String>());
|
||||||
|
return context;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue