merge YARN-719 from trunk. Move RMIdentifier from Container to ContainerTokenIdentifier. Contributed by Vinod Kumar Vavilapalli.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1487746 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f1a216b603
commit
0fb465cbc4
|
@ -1090,12 +1090,11 @@ public abstract class TaskAttemptImpl implements
|
||||||
+ taInfo.getPort());
|
+ taInfo.getPort());
|
||||||
String nodeHttpAddress = StringInterner.weakIntern(taInfo.getHostname() + ":"
|
String nodeHttpAddress = StringInterner.weakIntern(taInfo.getHostname() + ":"
|
||||||
+ taInfo.getHttpPort());
|
+ taInfo.getHttpPort());
|
||||||
// Resource/Priority/Tokens and RMIdentifier are only needed while
|
// Resource/Priority/Tokens are only needed while launching the container on
|
||||||
// launching the container on an NM, these are already completed tasks, so
|
// an NM, these are already completed tasks, so setting them to null
|
||||||
// setting them to null and RMIdentifier as 0
|
|
||||||
container =
|
container =
|
||||||
Container.newInstance(containerId, containerNodeId,
|
Container.newInstance(containerId, containerNodeId,
|
||||||
nodeHttpAddress, null, null, null, 0);
|
nodeHttpAddress, null, null, null);
|
||||||
computeRackAndLocality();
|
computeRackAndLocality();
|
||||||
launchTime = taInfo.getStartTime();
|
launchTime = taInfo.getStartTime();
|
||||||
finishTime = (taInfo.getFinishTime() != -1) ?
|
finishTime = (taInfo.getFinishTime() != -1) ?
|
||||||
|
|
|
@ -519,7 +519,7 @@ public class MRApp extends MRAppMaster {
|
||||||
cId.setId(containerCount++);
|
cId.setId(containerCount++);
|
||||||
NodeId nodeId = NodeId.newInstance(NM_HOST, NM_PORT);
|
NodeId nodeId = NodeId.newInstance(NM_HOST, NM_PORT);
|
||||||
Container container = Container.newInstance(cId, nodeId,
|
Container container = Container.newInstance(cId, nodeId,
|
||||||
NM_HOST + ":" + NM_HTTP_PORT, null, null, null, 0);
|
NM_HOST + ":" + NM_HTTP_PORT, null, null, null);
|
||||||
JobID id = TypeConverter.fromYarn(applicationId);
|
JobID id = TypeConverter.fromYarn(applicationId);
|
||||||
JobId jobId = TypeConverter.toYarn(id);
|
JobId jobId = TypeConverter.toYarn(id);
|
||||||
getContext().getEventHandler().handle(new JobHistoryEvent(jobId,
|
getContext().getEventHandler().handle(new JobHistoryEvent(jobId,
|
||||||
|
|
|
@ -49,7 +49,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.service.AbstractService;
|
import org.apache.hadoop.yarn.service.AbstractService;
|
||||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
|
@ -243,7 +242,7 @@ public class MRAppBenchmark {
|
||||||
containers.add(Container.newInstance(containerId,
|
containers.add(Container.newInstance(containerId,
|
||||||
NodeId.newInstance("host" + containerId.getId(), 2345),
|
NodeId.newInstance("host" + containerId.getId(), 2345),
|
||||||
"host" + containerId.getId() + ":5678",
|
"host" + containerId.getId() + ":5678",
|
||||||
req.getCapability(), req.getPriority(), null, 0));
|
req.getCapability(), req.getPriority(), null));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -409,11 +409,12 @@ public class TestContainerLauncherImpl {
|
||||||
|
|
||||||
private ContainerToken createNewContainerToken(ContainerId contId,
|
private ContainerToken createNewContainerToken(ContainerId contId,
|
||||||
String containerManagerAddr) {
|
String containerManagerAddr) {
|
||||||
|
long currentTime = System.currentTimeMillis();
|
||||||
return BuilderUtils.newContainerToken(NodeId.newInstance("127.0.0.1",
|
return BuilderUtils.newContainerToken(NodeId.newInstance("127.0.0.1",
|
||||||
1234), "password".getBytes(), new ContainerTokenIdentifier(
|
1234), "password".getBytes(), new ContainerTokenIdentifier(
|
||||||
contId, containerManagerAddr, "user",
|
contId, containerManagerAddr, "user",
|
||||||
Resource.newInstance(1024, 1),
|
Resource.newInstance(1024, 1),
|
||||||
System.currentTimeMillis() + 10000L, 123));
|
currentTime + 10000L, 123, currentTime));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class ContainerManagerForTest implements ContainerManager {
|
private static class ContainerManagerForTest implements ContainerManager {
|
||||||
|
|
|
@ -212,6 +212,9 @@ Release 2.0.5-beta - UNRELEASED
|
||||||
YARN-512. Log aggregation root directory check is more expensive than it
|
YARN-512. Log aggregation root directory check is more expensive than it
|
||||||
needs to be. (Maysam Yabandeh via jlowe)
|
needs to be. (Maysam Yabandeh via jlowe)
|
||||||
|
|
||||||
|
YARN-719. Move RMIdentifier from Container to ContainerTokenIdentifier.
|
||||||
|
(Vinod Kumar Vavilapalli via sseth)
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
YARN-383. AMRMClientImpl should handle null rmClient in stop()
|
YARN-383. AMRMClientImpl should handle null rmClient in stop()
|
||||||
|
|
|
@ -69,7 +69,7 @@ public abstract class Container implements Comparable<Container> {
|
||||||
@Private
|
@Private
|
||||||
public static Container newInstance(ContainerId containerId, NodeId nodeId,
|
public static Container newInstance(ContainerId containerId, NodeId nodeId,
|
||||||
String nodeHttpAddress, Resource resource, Priority priority,
|
String nodeHttpAddress, Resource resource, Priority priority,
|
||||||
ContainerToken containerToken, long rmIdentifier) {
|
ContainerToken containerToken) {
|
||||||
Container container = Records.newRecord(Container.class);
|
Container container = Records.newRecord(Container.class);
|
||||||
container.setId(containerId);
|
container.setId(containerId);
|
||||||
container.setNodeId(nodeId);
|
container.setNodeId(nodeId);
|
||||||
|
@ -77,7 +77,6 @@ public abstract class Container implements Comparable<Container> {
|
||||||
container.setResource(resource);
|
container.setResource(resource);
|
||||||
container.setPriority(priority);
|
container.setPriority(priority);
|
||||||
container.setContainerToken(containerToken);
|
container.setContainerToken(containerToken);
|
||||||
container.setRMIdentifier(rmIdentifier);
|
|
||||||
return container;
|
return container;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -152,16 +151,4 @@ public abstract class Container implements Comparable<Container> {
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
public abstract void setContainerToken(ContainerToken containerToken);
|
public abstract void setContainerToken(ContainerToken containerToken);
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the RMIdentifier of RM in which containers are allocated
|
|
||||||
* @return RMIdentifier
|
|
||||||
*/
|
|
||||||
@Private
|
|
||||||
@Unstable
|
|
||||||
public abstract long getRMIdentifer();
|
|
||||||
|
|
||||||
@Private
|
|
||||||
@Unstable
|
|
||||||
public abstract void setRMIdentifier(long rmIdentifier);
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -244,18 +244,6 @@ public class ContainerPBImpl extends Container {
|
||||||
this.containerToken = containerToken;
|
this.containerToken = containerToken;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getRMIdentifer() {
|
|
||||||
ContainerProtoOrBuilder p = viaProto ? proto : builder;
|
|
||||||
return p.getRmIdentifier();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setRMIdentifier(long rmIdentifier) {
|
|
||||||
maybeInitBuilder();
|
|
||||||
builder.setRmIdentifier((rmIdentifier));
|
|
||||||
}
|
|
||||||
|
|
||||||
private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
|
private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
|
||||||
return new ContainerIdPBImpl(p);
|
return new ContainerIdPBImpl(p);
|
||||||
}
|
}
|
||||||
|
|
|
@ -68,7 +68,6 @@ message ContainerProto {
|
||||||
optional ResourceProto resource = 4;
|
optional ResourceProto resource = 4;
|
||||||
optional PriorityProto priority = 5;
|
optional PriorityProto priority = 5;
|
||||||
optional hadoop.common.TokenProto container_token = 6;
|
optional hadoop.common.TokenProto container_token = 6;
|
||||||
optional int64 rm_identifier = 7;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
enum YarnApplicationStateProto {
|
enum YarnApplicationStateProto {
|
||||||
|
|
|
@ -18,9 +18,9 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.client;
|
package org.apache.hadoop.yarn.client;
|
||||||
|
|
||||||
import static org.mockito.Mockito.anyFloat;
|
import static org.mockito.Matchers.anyFloat;
|
||||||
import static org.mockito.Mockito.anyInt;
|
import static org.mockito.Matchers.anyInt;
|
||||||
import static org.mockito.Mockito.anyString;
|
import static org.mockito.Matchers.anyString;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
@ -56,7 +56,7 @@ public class TestAMRMClientAsync {
|
||||||
BuilderUtils.newContainerId(0, 0, 0, 0),
|
BuilderUtils.newContainerId(0, 0, 0, 0),
|
||||||
ContainerState.COMPLETE, "", 0));
|
ContainerState.COMPLETE, "", 0));
|
||||||
List<Container> allocated1 = Arrays.asList(
|
List<Container> allocated1 = Arrays.asList(
|
||||||
BuilderUtils.newContainer(null, null, null, null, null, null, 0));
|
BuilderUtils.newContainer(null, null, null, null, null, null));
|
||||||
final AllocateResponse response1 = createAllocateResponse(
|
final AllocateResponse response1 = createAllocateResponse(
|
||||||
new ArrayList<ContainerStatus>(), allocated1);
|
new ArrayList<ContainerStatus>(), allocated1);
|
||||||
final AllocateResponse response2 = createAllocateResponse(completed1,
|
final AllocateResponse response2 = createAllocateResponse(completed1,
|
||||||
|
|
|
@ -533,8 +533,8 @@ public class TestNMClientAsync {
|
||||||
nodeId = NodeId.newInstance("localhost", 0);
|
nodeId = NodeId.newInstance("localhost", 0);
|
||||||
// Create an empty record
|
// Create an empty record
|
||||||
containerToken = recordFactory.newRecordInstance(ContainerToken.class);
|
containerToken = recordFactory.newRecordInstance(ContainerToken.class);
|
||||||
return BuilderUtils.newContainer(
|
return BuilderUtils.newContainer(containerId, nodeId, null, null, null,
|
||||||
containerId, nodeId, null, null, null, containerToken, 0);
|
containerToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,15 +52,18 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
|
||||||
private Resource resource;
|
private Resource resource;
|
||||||
private long expiryTimeStamp;
|
private long expiryTimeStamp;
|
||||||
private int masterKeyId;
|
private int masterKeyId;
|
||||||
|
private long rmIdentifier;
|
||||||
|
|
||||||
public ContainerTokenIdentifier(ContainerId containerID, String hostName,
|
public ContainerTokenIdentifier(ContainerId containerID, String hostName,
|
||||||
String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId) {
|
String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId,
|
||||||
|
long rmIdentifier) {
|
||||||
this.containerId = containerID;
|
this.containerId = containerID;
|
||||||
this.nmHostAddr = hostName;
|
this.nmHostAddr = hostName;
|
||||||
this.appSubmitter = appSubmitter;
|
this.appSubmitter = appSubmitter;
|
||||||
this.resource = r;
|
this.resource = r;
|
||||||
this.expiryTimeStamp = expiryTimeStamp;
|
this.expiryTimeStamp = expiryTimeStamp;
|
||||||
this.masterKeyId = masterKeyId;
|
this.masterKeyId = masterKeyId;
|
||||||
|
this.rmIdentifier = rmIdentifier;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -93,6 +96,14 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
|
||||||
return this.masterKeyId;
|
return this.masterKeyId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the RMIdentifier of RM in which containers are allocated
|
||||||
|
* @return RMIdentifier
|
||||||
|
*/
|
||||||
|
public long getRMIdentifer() {
|
||||||
|
return this.rmIdentifier;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void write(DataOutput out) throws IOException {
|
public void write(DataOutput out) throws IOException {
|
||||||
LOG.debug("Writing ContainerTokenIdentifier to RPC layer: " + this);
|
LOG.debug("Writing ContainerTokenIdentifier to RPC layer: " + this);
|
||||||
|
@ -109,6 +120,7 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
|
||||||
out.writeInt(this.resource.getVirtualCores());
|
out.writeInt(this.resource.getVirtualCores());
|
||||||
out.writeLong(this.expiryTimeStamp);
|
out.writeLong(this.expiryTimeStamp);
|
||||||
out.writeInt(this.masterKeyId);
|
out.writeInt(this.masterKeyId);
|
||||||
|
out.writeLong(this.rmIdentifier);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -126,6 +138,7 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
|
||||||
this.resource = BuilderUtils.newResource(memory, vCores);
|
this.resource = BuilderUtils.newResource(memory, vCores);
|
||||||
this.expiryTimeStamp = in.readLong();
|
this.expiryTimeStamp = in.readLong();
|
||||||
this.masterKeyId = in.readInt();
|
this.masterKeyId = in.readInt();
|
||||||
|
this.rmIdentifier = in.readLong();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -192,10 +192,10 @@ public class BuilderUtils {
|
||||||
|
|
||||||
public static ContainerToken newContainerToken(ContainerId cId, String host,
|
public static ContainerToken newContainerToken(ContainerId cId, String host,
|
||||||
int port, String user, Resource r, long expiryTime, int masterKeyId,
|
int port, String user, Resource r, long expiryTime, int masterKeyId,
|
||||||
byte[] password) throws IOException {
|
byte[] password, long rmIdentifier) throws IOException {
|
||||||
ContainerTokenIdentifier identifier =
|
ContainerTokenIdentifier identifier =
|
||||||
new ContainerTokenIdentifier(cId, host, user, r, expiryTime,
|
new ContainerTokenIdentifier(cId, host, user, r, expiryTime,
|
||||||
masterKeyId);
|
masterKeyId, rmIdentifier);
|
||||||
return newContainerToken(BuilderUtils.newNodeId(host, port), password,
|
return newContainerToken(BuilderUtils.newNodeId(host, port), password,
|
||||||
identifier);
|
identifier);
|
||||||
}
|
}
|
||||||
|
@ -253,7 +253,7 @@ public class BuilderUtils {
|
||||||
|
|
||||||
public static Container newContainer(ContainerId containerId, NodeId nodeId,
|
public static Container newContainer(ContainerId containerId, NodeId nodeId,
|
||||||
String nodeHttpAddress, Resource resource, Priority priority,
|
String nodeHttpAddress, Resource resource, Priority priority,
|
||||||
ContainerToken containerToken, long rmIdentifier) {
|
ContainerToken containerToken) {
|
||||||
Container container = recordFactory.newRecordInstance(Container.class);
|
Container container = recordFactory.newRecordInstance(Container.class);
|
||||||
container.setId(containerId);
|
container.setId(containerId);
|
||||||
container.setNodeId(nodeId);
|
container.setNodeId(nodeId);
|
||||||
|
@ -261,7 +261,6 @@ public class BuilderUtils {
|
||||||
container.setResource(resource);
|
container.setResource(resource);
|
||||||
container.setPriority(priority);
|
container.setPriority(priority);
|
||||||
container.setContainerToken(containerToken);
|
container.setContainerToken(containerToken);
|
||||||
container.setRMIdentifier(rmIdentifier);
|
|
||||||
return container;
|
return container;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -103,7 +103,7 @@ public class TestContainerLaunchRPC {
|
||||||
containerId.setId(100);
|
containerId.setId(100);
|
||||||
Container container =
|
Container container =
|
||||||
BuilderUtils.newContainer(containerId, null, null, recordFactory
|
BuilderUtils.newContainer(containerId, null, null, recordFactory
|
||||||
.newRecordInstance(Resource.class), null, null, 0);
|
.newRecordInstance(Resource.class), null, null);
|
||||||
|
|
||||||
StartContainerRequest scRequest = recordFactory
|
StartContainerRequest scRequest = recordFactory
|
||||||
.newRecordInstance(StartContainerRequest.class);
|
.newRecordInstance(StartContainerRequest.class);
|
||||||
|
|
|
@ -125,7 +125,7 @@ public class TestRPC {
|
||||||
containerId.setId(100);
|
containerId.setId(100);
|
||||||
Container mockContainer =
|
Container mockContainer =
|
||||||
BuilderUtils.newContainer(containerId, null, null, recordFactory
|
BuilderUtils.newContainer(containerId, null, null, recordFactory
|
||||||
.newRecordInstance(Resource.class), null, null, 0);
|
.newRecordInstance(Resource.class), null, null);
|
||||||
// containerLaunchContext.env = new HashMap<CharSequence, CharSequence>();
|
// containerLaunchContext.env = new HashMap<CharSequence, CharSequence>();
|
||||||
// containerLaunchContext.command = new ArrayList<CharSequence>();
|
// containerLaunchContext.command = new ArrayList<CharSequence>();
|
||||||
|
|
||||||
|
|
|
@ -31,14 +31,9 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.security.token.SecretManager;
|
import org.apache.hadoop.security.token.SecretManager;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -169,36 +164,4 @@ public class BaseContainerTokenSecretManager extends
|
||||||
public ContainerTokenIdentifier createIdentifier() {
|
public ContainerTokenIdentifier createIdentifier() {
|
||||||
return new ContainerTokenIdentifier();
|
return new ContainerTokenIdentifier();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Helper function for creating ContainerTokens
|
|
||||||
*
|
|
||||||
* @param containerId
|
|
||||||
* @param nodeId
|
|
||||||
* @param appSubmitter
|
|
||||||
* @param capability
|
|
||||||
* @return the container-token
|
|
||||||
*/
|
|
||||||
public ContainerToken createContainerToken(ContainerId containerId,
|
|
||||||
NodeId nodeId, String appSubmitter, Resource capability) {
|
|
||||||
byte[] password;
|
|
||||||
ContainerTokenIdentifier tokenIdentifier;
|
|
||||||
long expiryTimeStamp =
|
|
||||||
System.currentTimeMillis() + containerTokenExpiryInterval;
|
|
||||||
|
|
||||||
// Lock so that we use the same MasterKey's keyId and its bytes
|
|
||||||
this.readLock.lock();
|
|
||||||
try {
|
|
||||||
tokenIdentifier =
|
|
||||||
new ContainerTokenIdentifier(containerId, nodeId.toString(),
|
|
||||||
appSubmitter, capability, expiryTimeStamp, this.currentMasterKey
|
|
||||||
.getMasterKey().getKeyId());
|
|
||||||
password = this.createPassword(tokenIdentifier);
|
|
||||||
|
|
||||||
} finally {
|
|
||||||
this.readLock.unlock();
|
|
||||||
}
|
|
||||||
|
|
||||||
return BuilderUtils.newContainerToken(nodeId, password, tokenIdentifier);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,7 +33,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.io.DataInputByteBuffer;
|
import org.apache.hadoop.io.DataInputByteBuffer;
|
||||||
import org.apache.hadoop.io.Text;
|
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
|
@ -53,7 +52,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||||
|
@ -426,7 +424,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
tokenId);
|
tokenId);
|
||||||
|
|
||||||
// Is the container coming from unknown RM
|
// Is the container coming from unknown RM
|
||||||
if (lauchContainer.getRMIdentifer() != nodeStatusUpdater
|
if (tokenId.getRMIdentifer() != nodeStatusUpdater
|
||||||
.getRMIdentifier()) {
|
.getRMIdentifier()) {
|
||||||
String msg = "\nContainer "+ containerIDStr
|
String msg = "\nContainer "+ containerIDStr
|
||||||
+ " rejected as it is allocated by a previous RM";
|
+ " rejected as it is allocated by a previous RM";
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager;
|
||||||
|
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -53,6 +54,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.Log
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
|
|
||||||
public class DummyContainerManager extends ContainerManagerImpl {
|
public class DummyContainerManager extends ContainerManagerImpl {
|
||||||
|
|
||||||
|
@ -192,8 +194,14 @@ public class DummyContainerManager extends ContainerManagerImpl {
|
||||||
UserGroupInformation remoteUgi,
|
UserGroupInformation remoteUgi,
|
||||||
org.apache.hadoop.yarn.api.records.Container container)
|
org.apache.hadoop.yarn.api.records.Container container)
|
||||||
throws YarnRemoteException {
|
throws YarnRemoteException {
|
||||||
return new ContainerTokenIdentifier(container.getId(),
|
try {
|
||||||
container.getNodeHttpAddress(), remoteUgi.getUserName(),
|
return new ContainerTokenIdentifier(container.getId(),
|
||||||
container.getResource(), System.currentTimeMillis(), 123);
|
container.getNodeHttpAddress(), remoteUgi.getUserName(),
|
||||||
|
container.getResource(), System.currentTimeMillis() + 100000l, 123,
|
||||||
|
BuilderUtils.newContainerTokenIdentifier(
|
||||||
|
container.getContainerToken()).getRMIdentifer());
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new YarnRemoteException(e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -139,13 +139,13 @@ public class TestEventFlow {
|
||||||
when(mockContainer.getId()).thenReturn(cID);
|
when(mockContainer.getId()).thenReturn(cID);
|
||||||
Resource r = BuilderUtils.newResource(1024, 1);
|
Resource r = BuilderUtils.newResource(1024, 1);
|
||||||
when(mockContainer.getResource()).thenReturn(r);
|
when(mockContainer.getResource()).thenReturn(r);
|
||||||
when(mockContainer.getRMIdentifer()).thenReturn(SIMULATED_RM_IDENTIFIER);
|
|
||||||
String user = "testing";
|
String user = "testing";
|
||||||
String host = "127.0.0.1";
|
String host = "127.0.0.1";
|
||||||
int port = 1234;
|
int port = 1234;
|
||||||
ContainerToken containerToken =
|
ContainerToken containerToken =
|
||||||
BuilderUtils.newContainerToken(cID, host, port, user, r,
|
BuilderUtils.newContainerToken(cID, host, port, user, r,
|
||||||
System.currentTimeMillis() + 10000L, 123, "password".getBytes());
|
System.currentTimeMillis() + 10000L, 123, "password".getBytes(),
|
||||||
|
SIMULATED_RM_IDENTIFIER);
|
||||||
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
||||||
StartContainerRequest request =
|
StartContainerRequest request =
|
||||||
recordFactory.newRecordInstance(StartContainerRequest.class);
|
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||||
|
@ -137,9 +138,11 @@ public class TestNodeManagerReboot {
|
||||||
resource.setMemory(1024);
|
resource.setMemory(1024);
|
||||||
mockContainer.setResource(resource);
|
mockContainer.setResource(resource);
|
||||||
NodeId nodeId = BuilderUtils.newNodeId("127.0.0.1", 12345);
|
NodeId nodeId = BuilderUtils.newNodeId("127.0.0.1", 12345);
|
||||||
mockContainer.setContainerToken(nm.getNMContext()
|
ContainerToken containerToken =
|
||||||
.getContainerTokenSecretManager()
|
BuilderUtils.newContainerToken(cId, nodeId.getHost(), nodeId.getPort(),
|
||||||
.createContainerToken(cId, nodeId, user, resource));
|
user, resource, System.currentTimeMillis() + 10000L, 123,
|
||||||
|
"password".getBytes(), 0);
|
||||||
|
mockContainer.setContainerToken(containerToken);
|
||||||
mockContainer.setNodeHttpAddress("127.0.0.1");
|
mockContainer.setNodeHttpAddress("127.0.0.1");
|
||||||
mockContainer.setNodeId(nodeId);
|
mockContainer.setNodeId(nodeId);
|
||||||
|
|
||||||
|
|
|
@ -282,7 +282,7 @@ public class TestNodeManagerResync {
|
||||||
while (!isStopped && numContainers < 10) {
|
while (!isStopped && numContainers < 10) {
|
||||||
ContainerId cId = TestNodeManagerShutdown.createContainerId();
|
ContainerId cId = TestNodeManagerShutdown.createContainerId();
|
||||||
Container container =
|
Container container =
|
||||||
BuilderUtils.newContainer(cId, null, null, null, null, null, 0);
|
BuilderUtils.newContainer(cId, null, null, null, null, null);
|
||||||
StartContainerRequest startRequest =
|
StartContainerRequest startRequest =
|
||||||
recordFactory.newRecordInstance(StartContainerRequest.class);
|
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||||
|
|
|
@ -184,8 +184,11 @@ public class TestNodeManagerShutdown {
|
||||||
containerLaunchContext.setCommands(commands);
|
containerLaunchContext.setCommands(commands);
|
||||||
Resource resource = BuilderUtils.newResource(1024, 1);
|
Resource resource = BuilderUtils.newResource(1024, 1);
|
||||||
mockContainer.setResource(resource);
|
mockContainer.setResource(resource);
|
||||||
mockContainer.setContainerToken(getContainerToken(nm, cId, nodeId,
|
ContainerToken containerToken =
|
||||||
cId.toString(), resource));
|
BuilderUtils.newContainerToken(cId, nodeId.getHost(), nodeId.getPort(),
|
||||||
|
user, resource, System.currentTimeMillis() + 10000L, 123,
|
||||||
|
"password".getBytes(), 0);
|
||||||
|
mockContainer.setContainerToken(containerToken);
|
||||||
StartContainerRequest startRequest =
|
StartContainerRequest startRequest =
|
||||||
recordFactory.newRecordInstance(StartContainerRequest.class);
|
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||||
|
@ -269,12 +272,6 @@ public class TestNodeManagerShutdown {
|
||||||
return scriptFile;
|
return scriptFile;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ContainerToken getContainerToken(NodeManager nm,
|
|
||||||
ContainerId containerId, NodeId nodeId, String user, Resource resource) {
|
|
||||||
return nm.getNMContext().getContainerTokenSecretManager()
|
|
||||||
.createContainerToken(containerId, nodeId, user, resource);
|
|
||||||
}
|
|
||||||
|
|
||||||
class TestNodeManager extends NodeManager {
|
class TestNodeManager extends NodeManager {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -124,8 +124,6 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
||||||
fileWriter.write("Hello World!");
|
fileWriter.write("Hello World!");
|
||||||
fileWriter.close();
|
fileWriter.close();
|
||||||
|
|
||||||
ContainerLaunchContext container = recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
|
||||||
|
|
||||||
// ////// Construct the Container-id
|
// ////// Construct the Container-id
|
||||||
ContainerId cId = createContainerId();
|
ContainerId cId = createContainerId();
|
||||||
|
|
||||||
|
@ -155,11 +153,10 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
||||||
int port = 12345;
|
int port = 12345;
|
||||||
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
||||||
context.getNodeId().getHost() + ":" + port);
|
context.getNodeId().getHost() + ":" + port);
|
||||||
when(mockContainer.getRMIdentifer()).thenReturn(super.DUMMY_RM_IDENTIFIER);
|
|
||||||
ContainerToken containerToken =
|
ContainerToken containerToken =
|
||||||
BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
|
BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
|
||||||
port, user, r, System.currentTimeMillis() + 10000L, 123,
|
port, user, r, System.currentTimeMillis() + 10000L, 123,
|
||||||
"password".getBytes());
|
"password".getBytes(), super.DUMMY_RM_IDENTIFIER);
|
||||||
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
||||||
StartContainerRequest startRequest =
|
StartContainerRequest startRequest =
|
||||||
recordFactory.newRecordInstance(StartContainerRequest.class);
|
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||||
|
@ -256,11 +253,10 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
||||||
int port = 12345;
|
int port = 12345;
|
||||||
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
||||||
context.getNodeId().getHost() + ":" + port);
|
context.getNodeId().getHost() + ":" + port);
|
||||||
when(mockContainer.getRMIdentifer()).thenReturn(super.DUMMY_RM_IDENTIFIER);
|
|
||||||
ContainerToken containerToken =
|
ContainerToken containerToken =
|
||||||
BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
|
BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
|
||||||
port, user, r, System.currentTimeMillis() + 10000L, 123,
|
port, user, r, System.currentTimeMillis() + 10000L, 123,
|
||||||
"password".getBytes());
|
"password".getBytes(), super.DUMMY_RM_IDENTIFIER);
|
||||||
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
||||||
|
|
||||||
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
|
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||||
|
@ -372,11 +368,10 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
||||||
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
|
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
|
||||||
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
||||||
context.getNodeId().getHost() + ":" + port);
|
context.getNodeId().getHost() + ":" + port);
|
||||||
when(mockContainer.getRMIdentifer()).thenReturn(super.DUMMY_RM_IDENTIFIER);
|
|
||||||
ContainerToken containerToken =
|
ContainerToken containerToken =
|
||||||
BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
|
BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
|
||||||
port, user, r, System.currentTimeMillis() + 10000L, 123,
|
port, user, r, System.currentTimeMillis() + 10000L, 123,
|
||||||
"password".getBytes());
|
"password".getBytes(), super.DUMMY_RM_IDENTIFIER);
|
||||||
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
||||||
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
|
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||||
|
@ -438,8 +433,6 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
||||||
fileWriter.write("Hello World!");
|
fileWriter.write("Hello World!");
|
||||||
fileWriter.close();
|
fileWriter.close();
|
||||||
|
|
||||||
ContainerLaunchContext container = recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
|
||||||
|
|
||||||
// ////// Construct the Container-id
|
// ////// Construct the Container-id
|
||||||
ContainerId cId = createContainerId();
|
ContainerId cId = createContainerId();
|
||||||
ApplicationId appId = cId.getApplicationAttemptId().getApplicationId();
|
ApplicationId appId = cId.getApplicationAttemptId().getApplicationId();
|
||||||
|
@ -470,13 +463,12 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
||||||
int port = 12345;
|
int port = 12345;
|
||||||
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
||||||
context.getNodeId().getHost() + ":" + port);
|
context.getNodeId().getHost() + ":" + port);
|
||||||
when(mockContainer.getRMIdentifer()).thenReturn(super.DUMMY_RM_IDENTIFIER);
|
|
||||||
|
|
||||||
// containerLaunchContext.command = new ArrayList<CharSequence>();
|
// containerLaunchContext.command = new ArrayList<CharSequence>();
|
||||||
ContainerToken containerToken =
|
ContainerToken containerToken =
|
||||||
BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
|
BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
|
||||||
port, user, r, System.currentTimeMillis() + 10000L, 123,
|
port, user, r, System.currentTimeMillis() + 10000L, 123,
|
||||||
"password".getBytes());
|
"password".getBytes(), super.DUMMY_RM_IDENTIFIER);
|
||||||
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
||||||
StartContainerRequest request = recordFactory.newRecordInstance(StartContainerRequest.class);
|
StartContainerRequest request = recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||||
request.setContainerLaunchContext(containerLaunchContext);
|
request.setContainerLaunchContext(containerLaunchContext);
|
||||||
|
@ -562,15 +554,14 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
||||||
Container mockContainer1 = mock(Container.class);
|
Container mockContainer1 = mock(Container.class);
|
||||||
when(mockContainer1.getId()).thenReturn(cId1);
|
when(mockContainer1.getId()).thenReturn(cId1);
|
||||||
// Construct the Container with Invalid RMIdentifier
|
// Construct the Container with Invalid RMIdentifier
|
||||||
when(mockContainer1.getRMIdentifer()).thenReturn(
|
|
||||||
(long) ResourceManagerConstants.RM_INVALID_IDENTIFIER);
|
|
||||||
StartContainerRequest startRequest1 =
|
StartContainerRequest startRequest1 =
|
||||||
recordFactory.newRecordInstance(StartContainerRequest.class);
|
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||||
startRequest1.setContainerLaunchContext(containerLaunchContext);
|
startRequest1.setContainerLaunchContext(containerLaunchContext);
|
||||||
|
|
||||||
ContainerToken containerToken1 =
|
ContainerToken containerToken1 =
|
||||||
BuilderUtils.newContainerToken(cId1, host, port, user, mockResource,
|
BuilderUtils.newContainerToken(cId1, host, port, user, mockResource,
|
||||||
System.currentTimeMillis() + 10000, 123, "password".getBytes());
|
System.currentTimeMillis() + 10000, 123, "password".getBytes(),
|
||||||
|
(long) ResourceManagerConstants.RM_INVALID_IDENTIFIER);
|
||||||
when(mockContainer1.getContainerToken()).thenReturn(containerToken1);
|
when(mockContainer1.getContainerToken()).thenReturn(containerToken1);
|
||||||
startRequest1.setContainer(mockContainer1);
|
startRequest1.setContainer(mockContainer1);
|
||||||
boolean catchException = false;
|
boolean catchException = false;
|
||||||
|
@ -592,7 +583,6 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
||||||
// Construct the Container with a RMIdentifier within current RM
|
// Construct the Container with a RMIdentifier within current RM
|
||||||
Container mockContainer2 = mock(Container.class);
|
Container mockContainer2 = mock(Container.class);
|
||||||
when(mockContainer2.getId()).thenReturn(cId2);
|
when(mockContainer2.getId()).thenReturn(cId2);
|
||||||
when(mockContainer2.getRMIdentifer()).thenReturn(super.DUMMY_RM_IDENTIFIER);
|
|
||||||
|
|
||||||
when(mockContainer2.getResource()).thenReturn(mockResource);
|
when(mockContainer2.getResource()).thenReturn(mockResource);
|
||||||
StartContainerRequest startRequest2 =
|
StartContainerRequest startRequest2 =
|
||||||
|
@ -600,7 +590,8 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
||||||
startRequest2.setContainerLaunchContext(containerLaunchContext);
|
startRequest2.setContainerLaunchContext(containerLaunchContext);
|
||||||
ContainerToken containerToken2 =
|
ContainerToken containerToken2 =
|
||||||
BuilderUtils.newContainerToken(cId1, host, port, user, mockResource,
|
BuilderUtils.newContainerToken(cId1, host, port, user, mockResource,
|
||||||
System.currentTimeMillis() + 10000, 123, "password".getBytes());
|
System.currentTimeMillis() + 10000, 123, "password".getBytes(),
|
||||||
|
super.DUMMY_RM_IDENTIFIER);
|
||||||
when(mockContainer2.getContainerToken()).thenReturn(containerToken2);
|
when(mockContainer2.getContainerToken()).thenReturn(containerToken2);
|
||||||
|
|
||||||
startRequest2.setContainer(mockContainer2);
|
startRequest2.setContainer(mockContainer2);
|
||||||
|
|
|
@ -485,9 +485,10 @@ public class TestApplication {
|
||||||
for (int i = 0; i < numContainers; i++) {
|
for (int i = 0; i < numContainers; i++) {
|
||||||
Container container = createMockedContainer(this.appId, i);
|
Container container = createMockedContainer(this.appId, i);
|
||||||
containers.add(container);
|
containers.add(container);
|
||||||
|
long currentTime = System.currentTimeMillis();
|
||||||
context.getContainerTokenSecretManager().startContainerSuccessful(
|
context.getContainerTokenSecretManager().startContainerSuccessful(
|
||||||
new ContainerTokenIdentifier(container.getContainer().getId(), "",
|
new ContainerTokenIdentifier(container.getContainer().getId(), "",
|
||||||
"", null, System.currentTimeMillis() + 1000, masterKey.getKeyId()));
|
"", null, currentTime + 1000, masterKey.getKeyId(), currentTime));
|
||||||
Assert.assertFalse(context.getContainerTokenSecretManager()
|
Assert.assertFalse(context.getContainerTokenSecretManager()
|
||||||
.isValidStartContainerRequest(container.getContainer().getId()));
|
.isValidStartContainerRequest(container.getContainer().getId()));
|
||||||
}
|
}
|
||||||
|
|
|
@ -548,7 +548,6 @@ public class TestContainer {
|
||||||
final Container c;
|
final Container c;
|
||||||
final Map<String, LocalResource> localResources;
|
final Map<String, LocalResource> localResources;
|
||||||
final Map<String, ByteBuffer> serviceData;
|
final Map<String, ByteBuffer> serviceData;
|
||||||
final String user;
|
|
||||||
|
|
||||||
WrappedContainer(int appId, long timestamp, int id, String user)
|
WrappedContainer(int appId, long timestamp, int id, String user)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -572,7 +571,6 @@ public class TestContainer {
|
||||||
dispatcher.register(AuxServicesEventType.class, auxBus);
|
dispatcher.register(AuxServicesEventType.class, auxBus);
|
||||||
dispatcher.register(ApplicationEventType.class, appBus);
|
dispatcher.register(ApplicationEventType.class, appBus);
|
||||||
dispatcher.register(LogHandlerEventType.class, LogBus);
|
dispatcher.register(LogHandlerEventType.class, LogBus);
|
||||||
this.user = user;
|
|
||||||
|
|
||||||
ctxt = mock(ContainerLaunchContext.class);
|
ctxt = mock(ContainerLaunchContext.class);
|
||||||
org.apache.hadoop.yarn.api.records.Container mockContainer =
|
org.apache.hadoop.yarn.api.records.Container mockContainer =
|
||||||
|
@ -584,9 +582,10 @@ public class TestContainer {
|
||||||
when(mockContainer.getResource()).thenReturn(resource);
|
when(mockContainer.getResource()).thenReturn(resource);
|
||||||
String host = "127.0.0.1";
|
String host = "127.0.0.1";
|
||||||
int port = 1234;
|
int port = 1234;
|
||||||
|
long currentTime = System.currentTimeMillis();
|
||||||
ContainerTokenIdentifier identifier =
|
ContainerTokenIdentifier identifier =
|
||||||
new ContainerTokenIdentifier(cId, "127.0.0.1", user, resource,
|
new ContainerTokenIdentifier(cId, "127.0.0.1", user, resource,
|
||||||
System.currentTimeMillis() + 10000L, 123);
|
currentTime + 10000L, 123, currentTime);
|
||||||
ContainerToken token =
|
ContainerToken token =
|
||||||
BuilderUtils.newContainerToken(BuilderUtils.newNodeId(host, port),
|
BuilderUtils.newContainerToken(BuilderUtils.newNodeId(host, port),
|
||||||
"password".getBytes(), identifier);
|
"password".getBytes(), identifier);
|
||||||
|
|
|
@ -183,7 +183,6 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
|
||||||
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
|
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
|
||||||
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
||||||
context.getNodeId().getHost() + ":" + port);
|
context.getNodeId().getHost() + ":" + port);
|
||||||
when(mockContainer.getRMIdentifer()).thenReturn(super.DUMMY_RM_IDENTIFIER);
|
|
||||||
|
|
||||||
Map<String, String> userSetEnv = new HashMap<String, String>();
|
Map<String, String> userSetEnv = new HashMap<String, String>();
|
||||||
userSetEnv.put(Environment.CONTAINER_ID.name(), "user_set_container_id");
|
userSetEnv.put(Environment.CONTAINER_ID.name(), "user_set_container_id");
|
||||||
|
@ -253,7 +252,7 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
|
||||||
ContainerToken containerToken =
|
ContainerToken containerToken =
|
||||||
BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
|
BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
|
||||||
port, user, r, System.currentTimeMillis() + 10000L, 1234,
|
port, user, r, System.currentTimeMillis() + 10000L, 1234,
|
||||||
"password".getBytes());
|
"password".getBytes(), super.DUMMY_RM_IDENTIFIER);
|
||||||
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
||||||
startRequest.setContainer(mockContainer);
|
startRequest.setContainer(mockContainer);
|
||||||
containerManager.startContainer(startRequest);
|
containerManager.startContainer(startRequest);
|
||||||
|
@ -384,7 +383,6 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
|
||||||
int port = 12345;
|
int port = 12345;
|
||||||
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
||||||
context.getNodeId().getHost() + ":" + port);
|
context.getNodeId().getHost() + ":" + port);
|
||||||
when(mockContainer.getRMIdentifer()).thenReturn(super.DUMMY_RM_IDENTIFIER);
|
|
||||||
|
|
||||||
// upload the script file so that the container can run it
|
// upload the script file so that the container can run it
|
||||||
URL resource_alpha =
|
URL resource_alpha =
|
||||||
|
@ -411,7 +409,7 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
|
||||||
ContainerToken containerToken =
|
ContainerToken containerToken =
|
||||||
BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
|
BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
|
||||||
port, user, r, System.currentTimeMillis() + 10000L, 123,
|
port, user, r, System.currentTimeMillis() + 10000L, 123,
|
||||||
"password".getBytes());
|
"password".getBytes(), super.DUMMY_RM_IDENTIFIER);
|
||||||
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
||||||
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
|
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||||
|
|
|
@ -48,15 +48,13 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||||
import org.apache.hadoop.io.DataInputBuffer;
|
|
||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.YarnException;
|
import org.apache.hadoop.yarn.YarnException;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
|
@ -74,10 +72,9 @@ import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
|
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
|
||||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
|
|
||||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
|
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
|
||||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
|
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
|
||||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||||
|
@ -721,7 +718,6 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
ContainerId cId = BuilderUtils.newContainerId(appAttemptId, 0);
|
ContainerId cId = BuilderUtils.newContainerId(appAttemptId, 0);
|
||||||
|
|
||||||
when(mockContainer.getId()).thenReturn(cId);
|
when(mockContainer.getId()).thenReturn(cId);
|
||||||
when(mockContainer.getRMIdentifer()).thenReturn(super.DUMMY_RM_IDENTIFIER);
|
|
||||||
|
|
||||||
URL resource_alpha =
|
URL resource_alpha =
|
||||||
ConverterUtils.getYarnUrlFromPath(localFS
|
ConverterUtils.getYarnUrlFromPath(localFS
|
||||||
|
@ -746,7 +742,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
when(mockContainer.getResource()).thenReturn(r);
|
when(mockContainer.getResource()).thenReturn(r);
|
||||||
when(mockContainer.getContainerToken()).thenReturn(
|
when(mockContainer.getContainerToken()).thenReturn(
|
||||||
BuilderUtils.newContainerToken(cId, "127.0.0.1", 1234, user, r,
|
BuilderUtils.newContainerToken(cId, "127.0.0.1", 1234, user, r,
|
||||||
System.currentTimeMillis() + 10000L, 123, "password".getBytes()));
|
System.currentTimeMillis() + 10000L, 123, "password".getBytes(),
|
||||||
|
super.DUMMY_RM_IDENTIFIER));
|
||||||
StartContainerRequest startRequest =
|
StartContainerRequest startRequest =
|
||||||
recordFactory.newRecordInstance(StartContainerRequest.class);
|
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||||
|
|
|
@ -21,7 +21,8 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Mockito.*;
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.BufferedReader;
|
import java.io.BufferedReader;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
@ -220,8 +221,6 @@ public class TestContainersMonitor extends BaseContainerManagerTest {
|
||||||
int port = 12345;
|
int port = 12345;
|
||||||
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
||||||
context.getNodeId().getHost() + ":" + port);
|
context.getNodeId().getHost() + ":" + port);
|
||||||
when(mockContainer.getRMIdentifer()).thenReturn(
|
|
||||||
super.DUMMY_RM_IDENTIFIER);
|
|
||||||
|
|
||||||
URL resource_alpha =
|
URL resource_alpha =
|
||||||
ConverterUtils.getYarnUrlFromPath(localFS
|
ConverterUtils.getYarnUrlFromPath(localFS
|
||||||
|
@ -250,7 +249,7 @@ public class TestContainersMonitor extends BaseContainerManagerTest {
|
||||||
ContainerToken containerToken =
|
ContainerToken containerToken =
|
||||||
BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
|
BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
|
||||||
port, user, r, System.currentTimeMillis() + 10000L, 123,
|
port, user, r, System.currentTimeMillis() + 10000L, 123,
|
||||||
"password".getBytes());
|
"password".getBytes(), super.DUMMY_RM_IDENTIFIER);
|
||||||
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
||||||
startRequest.setContainer(mockContainer);
|
startRequest.setContainer(mockContainer);
|
||||||
containerManager.startContainer(startRequest);
|
containerManager.startContainer(startRequest);
|
||||||
|
|
|
@ -64,10 +64,11 @@ public class MockContainer implements Container {
|
||||||
uniqId);
|
uniqId);
|
||||||
this.launchContext = recordFactory
|
this.launchContext = recordFactory
|
||||||
.newRecordInstance(ContainerLaunchContext.class);
|
.newRecordInstance(ContainerLaunchContext.class);
|
||||||
|
long currentTime = System.currentTimeMillis();
|
||||||
ContainerToken containerToken =
|
ContainerToken containerToken =
|
||||||
BuilderUtils.newContainerToken(id, "127.0.0.1", 1234, user,
|
BuilderUtils.newContainerToken(id, "127.0.0.1", 1234, user,
|
||||||
BuilderUtils.newResource(1024, 1),
|
BuilderUtils.newResource(1024, 1), currentTime + 10000, 123,
|
||||||
System.currentTimeMillis() + 10000, 123, "password".getBytes());
|
"password".getBytes(), currentTime);
|
||||||
this.state = ContainerState.NEW;
|
this.state = ContainerState.NEW;
|
||||||
|
|
||||||
mockContainer = mock(org.apache.hadoop.yarn.api.records.Container.class);
|
mockContainer = mock(org.apache.hadoop.yarn.api.records.Container.class);
|
||||||
|
|
|
@ -34,7 +34,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
|
@ -183,10 +182,11 @@ public class TestNMWebServer {
|
||||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||||
org.apache.hadoop.yarn.api.records.Container mockContainer =
|
org.apache.hadoop.yarn.api.records.Container mockContainer =
|
||||||
mock(org.apache.hadoop.yarn.api.records.Container.class);
|
mock(org.apache.hadoop.yarn.api.records.Container.class);
|
||||||
|
long currentTime = System.currentTimeMillis();
|
||||||
ContainerToken containerToken =
|
ContainerToken containerToken =
|
||||||
BuilderUtils.newContainerToken(containerId, "127.0.0.1", 1234, user,
|
BuilderUtils.newContainerToken(containerId, "127.0.0.1", 1234, user,
|
||||||
BuilderUtils.newResource(1024, 1),
|
BuilderUtils.newResource(1024, 1), currentTime + 10000L, 123,
|
||||||
System.currentTimeMillis() + 10000L, 123, "password".getBytes());
|
"password".getBytes(), currentTime);
|
||||||
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
||||||
when(mockContainer.getId()).thenReturn(containerId);
|
when(mockContainer.getId()).thenReturn(containerId);
|
||||||
Container container =
|
Container container =
|
||||||
|
|
|
@ -53,7 +53,6 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
|
@ -1242,9 +1241,9 @@ public class LeafQueue implements CSQueue {
|
||||||
.getApplicationAttemptId(), application.getNewContainerId());
|
.getApplicationAttemptId(), application.getNewContainerId());
|
||||||
|
|
||||||
// Create the container
|
// Create the container
|
||||||
Container container = BuilderUtils.newContainer(containerId, nodeId,
|
Container container =
|
||||||
node.getRMNode().getHttpAddress(), capability, priority,
|
BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
|
||||||
null, ResourceManager.clusterTimeStamp);
|
.getHttpAddress(), capability, priority, null);
|
||||||
|
|
||||||
return container;
|
return container;
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,7 +34,6 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
|
@ -160,15 +159,15 @@ public class AppSchedulable extends Schedulable {
|
||||||
.getApplicationAttemptId(), application.getNewContainerId());
|
.getApplicationAttemptId(), application.getNewContainerId());
|
||||||
ContainerToken containerToken =
|
ContainerToken containerToken =
|
||||||
containerTokenSecretManager.createContainerToken(containerId, nodeId,
|
containerTokenSecretManager.createContainerToken(containerId, nodeId,
|
||||||
application.getUser(), capability);
|
application.getUser(), capability);
|
||||||
if (containerToken == null) {
|
if (containerToken == null) {
|
||||||
return null; // Try again later.
|
return null; // Try again later.
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create the container
|
// Create the container
|
||||||
Container container = BuilderUtils.newContainer(containerId, nodeId,
|
Container container =
|
||||||
node.getRMNode().getHttpAddress(), capability, priority,
|
BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
|
||||||
containerToken, ResourceManager.clusterTimeStamp);
|
.getHttpAddress(), capability, priority, containerToken);
|
||||||
|
|
||||||
return container;
|
return container;
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,7 +56,6 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
|
||||||
|
@ -554,16 +553,16 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
|
||||||
|
|
||||||
containerToken =
|
containerToken =
|
||||||
this.rmContext.getContainerTokenSecretManager()
|
this.rmContext.getContainerTokenSecretManager()
|
||||||
.createContainerToken(containerId, nodeId,
|
.createContainerToken(containerId, nodeId, application.getUser(),
|
||||||
application.getUser(), capability);
|
capability);
|
||||||
if (containerToken == null) {
|
if (containerToken == null) {
|
||||||
return i; // Try again later.
|
return i; // Try again later.
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create the container
|
// Create the container
|
||||||
Container container = BuilderUtils.newContainer(containerId, nodeId,
|
Container container =
|
||||||
node.getRMNode().getHttpAddress(), capability, priority,
|
BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
|
||||||
containerToken, ResourceManager.clusterTimeStamp);
|
.getHttpAddress(), capability, priority, containerToken);
|
||||||
|
|
||||||
// Allocate!
|
// Allocate!
|
||||||
|
|
||||||
|
|
|
@ -25,9 +25,16 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
|
||||||
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* SecretManager for ContainerTokens. This is RM-specific and rolls the
|
* SecretManager for ContainerTokens. This is RM-specific and rolls the
|
||||||
|
@ -153,4 +160,37 @@ public class RMContainerTokenSecretManager extends
|
||||||
activateNextMasterKey();
|
activateNextMasterKey();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper function for creating ContainerTokens
|
||||||
|
*
|
||||||
|
* @param containerId
|
||||||
|
* @param nodeId
|
||||||
|
* @param appSubmitter
|
||||||
|
* @param capability
|
||||||
|
* @return the container-token
|
||||||
|
*/
|
||||||
|
public ContainerToken
|
||||||
|
createContainerToken(ContainerId containerId, NodeId nodeId,
|
||||||
|
String appSubmitter, Resource capability) {
|
||||||
|
byte[] password;
|
||||||
|
ContainerTokenIdentifier tokenIdentifier;
|
||||||
|
long expiryTimeStamp =
|
||||||
|
System.currentTimeMillis() + containerTokenExpiryInterval;
|
||||||
|
|
||||||
|
// Lock so that we use the same MasterKey's keyId and its bytes
|
||||||
|
this.readLock.lock();
|
||||||
|
try {
|
||||||
|
tokenIdentifier =
|
||||||
|
new ContainerTokenIdentifier(containerId, nodeId.toString(),
|
||||||
|
appSubmitter, capability, expiryTimeStamp, this.currentMasterKey
|
||||||
|
.getMasterKey().getKeyId(), ResourceManager.clusterTimeStamp);
|
||||||
|
password = this.createPassword(tokenIdentifier);
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
this.readLock.unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
return BuilderUtils.newContainerToken(nodeId, password, tokenIdentifier);
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -188,7 +188,6 @@ public class NodeManager implements ContainerManager {
|
||||||
this.nodeId, nodeHttpAddress,
|
this.nodeId, nodeHttpAddress,
|
||||||
requestContainer.getResource(),
|
requestContainer.getResource(),
|
||||||
null, null // DKDC - Doesn't matter
|
null, null // DKDC - Doesn't matter
|
||||||
, 0
|
|
||||||
);
|
);
|
||||||
|
|
||||||
ContainerStatus containerStatus =
|
ContainerStatus containerStatus =
|
||||||
|
|
|
@ -23,7 +23,9 @@ import junit.framework.Assert;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
|
@ -32,6 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||||
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -77,8 +80,12 @@ public class TestApplicationMasterService {
|
||||||
}
|
}
|
||||||
|
|
||||||
// assert RMIdentifer is set properly in allocated containers
|
// assert RMIdentifer is set properly in allocated containers
|
||||||
Assert.assertEquals(rm.clusterTimeStamp, alloc1Response
|
Container allocatedContainer =
|
||||||
.getAllocatedContainers().get(0).getRMIdentifer());
|
alloc1Response.getAllocatedContainers().get(0);
|
||||||
|
ContainerTokenIdentifier tokenId =
|
||||||
|
BuilderUtils.newContainerTokenIdentifier(allocatedContainer
|
||||||
|
.getContainerToken());
|
||||||
|
Assert.assertEquals(MockRM.clusterTimeStamp, tokenId.getRMIdentifer());
|
||||||
rm.stop();
|
rm.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -69,7 +69,7 @@ public class TestRMContainerImpl {
|
||||||
Priority priority = BuilderUtils.newPriority(5);
|
Priority priority = BuilderUtils.newPriority(5);
|
||||||
|
|
||||||
Container container = BuilderUtils.newContainer(containerId, nodeId,
|
Container container = BuilderUtils.newContainer(containerId, nodeId,
|
||||||
"host:3465", resource, priority, null, 0);
|
"host:3465", resource, priority, null);
|
||||||
|
|
||||||
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
|
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
|
||||||
nodeId, eventHandler, expirer);
|
nodeId, eventHandler, expirer);
|
||||||
|
@ -139,7 +139,7 @@ public class TestRMContainerImpl {
|
||||||
Priority priority = BuilderUtils.newPriority(5);
|
Priority priority = BuilderUtils.newPriority(5);
|
||||||
|
|
||||||
Container container = BuilderUtils.newContainer(containerId, nodeId,
|
Container container = BuilderUtils.newContainer(containerId, nodeId,
|
||||||
"host:3465", resource, priority, null, 0);
|
"host:3465", resource, priority, null);
|
||||||
|
|
||||||
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
|
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
|
||||||
nodeId, eventHandler, expirer);
|
nodeId, eventHandler, expirer);
|
||||||
|
|
|
@ -223,7 +223,8 @@ public class TestContainerManagerSecurity {
|
||||||
ContainerTokenIdentifier modifiedIdentifier =
|
ContainerTokenIdentifier modifiedIdentifier =
|
||||||
new ContainerTokenIdentifier(dummyIdentifier.getContainerID(),
|
new ContainerTokenIdentifier(dummyIdentifier.getContainerID(),
|
||||||
dummyIdentifier.getNmHostAddress(), "testUser", modifiedResource,
|
dummyIdentifier.getNmHostAddress(), "testUser", modifiedResource,
|
||||||
Long.MAX_VALUE, dummyIdentifier.getMasterKeyId());
|
Long.MAX_VALUE, dummyIdentifier.getMasterKeyId(),
|
||||||
|
ResourceManager.clusterTimeStamp);
|
||||||
Token<ContainerTokenIdentifier> modifiedToken = new Token<ContainerTokenIdentifier>(
|
Token<ContainerTokenIdentifier> modifiedToken = new Token<ContainerTokenIdentifier>(
|
||||||
modifiedIdentifier.getBytes(), containerToken.getPassword().array(),
|
modifiedIdentifier.getBytes(), containerToken.getPassword().array(),
|
||||||
new Text(containerToken.getKind()), new Text(containerToken
|
new Text(containerToken.getKind()), new Text(containerToken
|
||||||
|
@ -340,10 +341,10 @@ public class TestContainerManagerSecurity {
|
||||||
resourceManager.getRMContainerTokenSecretManager();
|
resourceManager.getRMContainerTokenSecretManager();
|
||||||
final ContainerTokenIdentifier newTokenId =
|
final ContainerTokenIdentifier newTokenId =
|
||||||
new ContainerTokenIdentifier(tokenId.getContainerID(),
|
new ContainerTokenIdentifier(tokenId.getContainerID(),
|
||||||
tokenId.getNmHostAddress(), tokenId.getApplicationSubmitter(),
|
tokenId.getNmHostAddress(), tokenId.getApplicationSubmitter(),
|
||||||
tokenId.getResource(),
|
tokenId.getResource(), System.currentTimeMillis() - 1,
|
||||||
System.currentTimeMillis() - 1,
|
containerTokenSecreteManager.getCurrentKey().getKeyId(),
|
||||||
containerTokenSecreteManager.getCurrentKey().getKeyId());
|
ResourceManager.clusterTimeStamp);
|
||||||
final byte[] passowrd =
|
final byte[] passowrd =
|
||||||
containerTokenSecreteManager.createPassword(
|
containerTokenSecreteManager.createPassword(
|
||||||
newTokenId);
|
newTokenId);
|
||||||
|
|
Loading…
Reference in New Issue