YARN-719. Move RMIdentifier from Container to ContainerTokenIdentifier. Contributed by Vinod Kumar Vavilapalli.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1487741 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9ae713ef58
commit
b16c5638b5
|
@ -1090,12 +1090,11 @@ public abstract class TaskAttemptImpl implements
|
|||
+ taInfo.getPort());
|
||||
String nodeHttpAddress = StringInterner.weakIntern(taInfo.getHostname() + ":"
|
||||
+ taInfo.getHttpPort());
|
||||
// Resource/Priority/Tokens and RMIdentifier are only needed while
|
||||
// launching the container on an NM, these are already completed tasks, so
|
||||
// setting them to null and RMIdentifier as 0
|
||||
// Resource/Priority/Tokens are only needed while launching the container on
|
||||
// an NM, these are already completed tasks, so setting them to null
|
||||
container =
|
||||
Container.newInstance(containerId, containerNodeId,
|
||||
nodeHttpAddress, null, null, null, 0);
|
||||
nodeHttpAddress, null, null, null);
|
||||
computeRackAndLocality();
|
||||
launchTime = taInfo.getStartTime();
|
||||
finishTime = (taInfo.getFinishTime() != -1) ?
|
||||
|
|
|
@ -519,7 +519,7 @@ public class MRApp extends MRAppMaster {
|
|||
cId.setId(containerCount++);
|
||||
NodeId nodeId = NodeId.newInstance(NM_HOST, NM_PORT);
|
||||
Container container = Container.newInstance(cId, nodeId,
|
||||
NM_HOST + ":" + NM_HTTP_PORT, null, null, null, 0);
|
||||
NM_HOST + ":" + NM_HTTP_PORT, null, null, null);
|
||||
JobID id = TypeConverter.fromYarn(applicationId);
|
||||
JobId jobId = TypeConverter.toYarn(id);
|
||||
getContext().getEventHandler().handle(new JobHistoryEvent(jobId,
|
||||
|
|
|
@ -49,7 +49,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.LogManager;
|
||||
|
@ -243,7 +242,7 @@ public class MRAppBenchmark {
|
|||
containers.add(Container.newInstance(containerId,
|
||||
NodeId.newInstance("host" + containerId.getId(), 2345),
|
||||
"host" + containerId.getId() + ":5678",
|
||||
req.getCapability(), req.getPriority(), null, 0));
|
||||
req.getCapability(), req.getPriority(), null));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -409,11 +409,12 @@ public class TestContainerLauncherImpl {
|
|||
|
||||
private ContainerToken createNewContainerToken(ContainerId contId,
|
||||
String containerManagerAddr) {
|
||||
long currentTime = System.currentTimeMillis();
|
||||
return BuilderUtils.newContainerToken(NodeId.newInstance("127.0.0.1",
|
||||
1234), "password".getBytes(), new ContainerTokenIdentifier(
|
||||
contId, containerManagerAddr, "user",
|
||||
Resource.newInstance(1024, 1),
|
||||
System.currentTimeMillis() + 10000L, 123));
|
||||
currentTime + 10000L, 123, currentTime));
|
||||
}
|
||||
|
||||
private static class ContainerManagerForTest implements ContainerManager {
|
||||
|
|
|
@ -232,6 +232,9 @@ Release 2.0.5-beta - UNRELEASED
|
|||
YARN-512. Log aggregation root directory check is more expensive than it
|
||||
needs to be. (Maysam Yabandeh via jlowe)
|
||||
|
||||
YARN-719. Move RMIdentifier from Container to ContainerTokenIdentifier.
|
||||
(Vinod Kumar Vavilapalli via sseth)
|
||||
|
||||
BUG FIXES
|
||||
|
||||
YARN-383. AMRMClientImpl should handle null rmClient in stop()
|
||||
|
|
|
@ -69,7 +69,7 @@ public abstract class Container implements Comparable<Container> {
|
|||
@Private
|
||||
public static Container newInstance(ContainerId containerId, NodeId nodeId,
|
||||
String nodeHttpAddress, Resource resource, Priority priority,
|
||||
ContainerToken containerToken, long rmIdentifier) {
|
||||
ContainerToken containerToken) {
|
||||
Container container = Records.newRecord(Container.class);
|
||||
container.setId(containerId);
|
||||
container.setNodeId(nodeId);
|
||||
|
@ -77,7 +77,6 @@ public abstract class Container implements Comparable<Container> {
|
|||
container.setResource(resource);
|
||||
container.setPriority(priority);
|
||||
container.setContainerToken(containerToken);
|
||||
container.setRMIdentifier(rmIdentifier);
|
||||
return container;
|
||||
}
|
||||
|
||||
|
@ -152,16 +151,4 @@ public abstract class Container implements Comparable<Container> {
|
|||
@Private
|
||||
@Unstable
|
||||
public abstract void setContainerToken(ContainerToken containerToken);
|
||||
|
||||
/**
|
||||
* Get the RMIdentifier of RM in which containers are allocated
|
||||
* @return RMIdentifier
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract long getRMIdentifer();
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setRMIdentifier(long rmIdentifier);
|
||||
}
|
||||
|
|
|
@ -244,18 +244,6 @@ public class ContainerPBImpl extends Container {
|
|||
this.containerToken = containerToken;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getRMIdentifer() {
|
||||
ContainerProtoOrBuilder p = viaProto ? proto : builder;
|
||||
return p.getRmIdentifier();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRMIdentifier(long rmIdentifier) {
|
||||
maybeInitBuilder();
|
||||
builder.setRmIdentifier((rmIdentifier));
|
||||
}
|
||||
|
||||
private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
|
||||
return new ContainerIdPBImpl(p);
|
||||
}
|
||||
|
|
|
@ -68,7 +68,6 @@ message ContainerProto {
|
|||
optional ResourceProto resource = 4;
|
||||
optional PriorityProto priority = 5;
|
||||
optional hadoop.common.TokenProto container_token = 6;
|
||||
optional int64 rm_identifier = 7;
|
||||
}
|
||||
|
||||
enum YarnApplicationStateProto {
|
||||
|
|
|
@ -18,9 +18,9 @@
|
|||
|
||||
package org.apache.hadoop.yarn.client;
|
||||
|
||||
import static org.mockito.Mockito.anyFloat;
|
||||
import static org.mockito.Mockito.anyInt;
|
||||
import static org.mockito.Mockito.anyString;
|
||||
import static org.mockito.Matchers.anyFloat;
|
||||
import static org.mockito.Matchers.anyInt;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
|
@ -56,7 +56,7 @@ public class TestAMRMClientAsync {
|
|||
BuilderUtils.newContainerId(0, 0, 0, 0),
|
||||
ContainerState.COMPLETE, "", 0));
|
||||
List<Container> allocated1 = Arrays.asList(
|
||||
BuilderUtils.newContainer(null, null, null, null, null, null, 0));
|
||||
BuilderUtils.newContainer(null, null, null, null, null, null));
|
||||
final AllocateResponse response1 = createAllocateResponse(
|
||||
new ArrayList<ContainerStatus>(), allocated1);
|
||||
final AllocateResponse response2 = createAllocateResponse(completed1,
|
||||
|
|
|
@ -533,8 +533,8 @@ public class TestNMClientAsync {
|
|||
nodeId = NodeId.newInstance("localhost", 0);
|
||||
// Create an empty record
|
||||
containerToken = recordFactory.newRecordInstance(ContainerToken.class);
|
||||
return BuilderUtils.newContainer(
|
||||
containerId, nodeId, null, null, null, containerToken, 0);
|
||||
return BuilderUtils.newContainer(containerId, nodeId, null, null, null,
|
||||
containerToken);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -52,15 +52,18 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
|
|||
private Resource resource;
|
||||
private long expiryTimeStamp;
|
||||
private int masterKeyId;
|
||||
private long rmIdentifier;
|
||||
|
||||
public ContainerTokenIdentifier(ContainerId containerID, String hostName,
|
||||
String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId) {
|
||||
String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId,
|
||||
long rmIdentifier) {
|
||||
this.containerId = containerID;
|
||||
this.nmHostAddr = hostName;
|
||||
this.appSubmitter = appSubmitter;
|
||||
this.resource = r;
|
||||
this.expiryTimeStamp = expiryTimeStamp;
|
||||
this.masterKeyId = masterKeyId;
|
||||
this.rmIdentifier = rmIdentifier;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -93,6 +96,14 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
|
|||
return this.masterKeyId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the RMIdentifier of RM in which containers are allocated
|
||||
* @return RMIdentifier
|
||||
*/
|
||||
public long getRMIdentifer() {
|
||||
return this.rmIdentifier;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
LOG.debug("Writing ContainerTokenIdentifier to RPC layer: " + this);
|
||||
|
@ -109,6 +120,7 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
|
|||
out.writeInt(this.resource.getVirtualCores());
|
||||
out.writeLong(this.expiryTimeStamp);
|
||||
out.writeInt(this.masterKeyId);
|
||||
out.writeLong(this.rmIdentifier);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -126,6 +138,7 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
|
|||
this.resource = BuilderUtils.newResource(memory, vCores);
|
||||
this.expiryTimeStamp = in.readLong();
|
||||
this.masterKeyId = in.readInt();
|
||||
this.rmIdentifier = in.readLong();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -192,10 +192,10 @@ public class BuilderUtils {
|
|||
|
||||
public static ContainerToken newContainerToken(ContainerId cId, String host,
|
||||
int port, String user, Resource r, long expiryTime, int masterKeyId,
|
||||
byte[] password) throws IOException {
|
||||
byte[] password, long rmIdentifier) throws IOException {
|
||||
ContainerTokenIdentifier identifier =
|
||||
new ContainerTokenIdentifier(cId, host, user, r, expiryTime,
|
||||
masterKeyId);
|
||||
masterKeyId, rmIdentifier);
|
||||
return newContainerToken(BuilderUtils.newNodeId(host, port), password,
|
||||
identifier);
|
||||
}
|
||||
|
@ -253,7 +253,7 @@ public class BuilderUtils {
|
|||
|
||||
public static Container newContainer(ContainerId containerId, NodeId nodeId,
|
||||
String nodeHttpAddress, Resource resource, Priority priority,
|
||||
ContainerToken containerToken, long rmIdentifier) {
|
||||
ContainerToken containerToken) {
|
||||
Container container = recordFactory.newRecordInstance(Container.class);
|
||||
container.setId(containerId);
|
||||
container.setNodeId(nodeId);
|
||||
|
@ -261,7 +261,6 @@ public class BuilderUtils {
|
|||
container.setResource(resource);
|
||||
container.setPriority(priority);
|
||||
container.setContainerToken(containerToken);
|
||||
container.setRMIdentifier(rmIdentifier);
|
||||
return container;
|
||||
}
|
||||
|
||||
|
|
|
@ -103,7 +103,7 @@ public class TestContainerLaunchRPC {
|
|||
containerId.setId(100);
|
||||
Container container =
|
||||
BuilderUtils.newContainer(containerId, null, null, recordFactory
|
||||
.newRecordInstance(Resource.class), null, null, 0);
|
||||
.newRecordInstance(Resource.class), null, null);
|
||||
|
||||
StartContainerRequest scRequest = recordFactory
|
||||
.newRecordInstance(StartContainerRequest.class);
|
||||
|
|
|
@ -125,7 +125,7 @@ public class TestRPC {
|
|||
containerId.setId(100);
|
||||
Container mockContainer =
|
||||
BuilderUtils.newContainer(containerId, null, null, recordFactory
|
||||
.newRecordInstance(Resource.class), null, null, 0);
|
||||
.newRecordInstance(Resource.class), null, null);
|
||||
// containerLaunchContext.env = new HashMap<CharSequence, CharSequence>();
|
||||
// containerLaunchContext.command = new ArrayList<CharSequence>();
|
||||
|
||||
|
|
|
@ -31,14 +31,9 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.token.SecretManager;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
/**
|
||||
|
@ -169,36 +164,4 @@ public class BaseContainerTokenSecretManager extends
|
|||
public ContainerTokenIdentifier createIdentifier() {
|
||||
return new ContainerTokenIdentifier();
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper function for creating ContainerTokens
|
||||
*
|
||||
* @param containerId
|
||||
* @param nodeId
|
||||
* @param appSubmitter
|
||||
* @param capability
|
||||
* @return the container-token
|
||||
*/
|
||||
public ContainerToken createContainerToken(ContainerId containerId,
|
||||
NodeId nodeId, String appSubmitter, Resource capability) {
|
||||
byte[] password;
|
||||
ContainerTokenIdentifier tokenIdentifier;
|
||||
long expiryTimeStamp =
|
||||
System.currentTimeMillis() + containerTokenExpiryInterval;
|
||||
|
||||
// Lock so that we use the same MasterKey's keyId and its bytes
|
||||
this.readLock.lock();
|
||||
try {
|
||||
tokenIdentifier =
|
||||
new ContainerTokenIdentifier(containerId, nodeId.toString(),
|
||||
appSubmitter, capability, expiryTimeStamp, this.currentMasterKey
|
||||
.getMasterKey().getKeyId());
|
||||
password = this.createPassword(tokenIdentifier);
|
||||
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
|
||||
return BuilderUtils.newContainerToken(nodeId, password, tokenIdentifier);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,7 +33,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.io.DataInputByteBuffer;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
|
@ -53,7 +52,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
|
@ -426,7 +424,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
tokenId);
|
||||
|
||||
// Is the container coming from unknown RM
|
||||
if (lauchContainer.getRMIdentifer() != nodeStatusUpdater
|
||||
if (tokenId.getRMIdentifer() != nodeStatusUpdater
|
||||
.getRMIdentifier()) {
|
||||
String msg = "\nContainer "+ containerIDStr
|
||||
+ " rejected as it is allocated by a previous RM";
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager;
|
|||
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -53,6 +54,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.Log
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
|
||||
public class DummyContainerManager extends ContainerManagerImpl {
|
||||
|
||||
|
@ -192,8 +194,14 @@ public class DummyContainerManager extends ContainerManagerImpl {
|
|||
UserGroupInformation remoteUgi,
|
||||
org.apache.hadoop.yarn.api.records.Container container)
|
||||
throws YarnRemoteException {
|
||||
return new ContainerTokenIdentifier(container.getId(),
|
||||
container.getNodeHttpAddress(), remoteUgi.getUserName(),
|
||||
container.getResource(), System.currentTimeMillis(), 123);
|
||||
try {
|
||||
return new ContainerTokenIdentifier(container.getId(),
|
||||
container.getNodeHttpAddress(), remoteUgi.getUserName(),
|
||||
container.getResource(), System.currentTimeMillis() + 100000l, 123,
|
||||
BuilderUtils.newContainerTokenIdentifier(
|
||||
container.getContainerToken()).getRMIdentifer());
|
||||
} catch (IOException e) {
|
||||
throw new YarnRemoteException(e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -139,13 +139,13 @@ public class TestEventFlow {
|
|||
when(mockContainer.getId()).thenReturn(cID);
|
||||
Resource r = BuilderUtils.newResource(1024, 1);
|
||||
when(mockContainer.getResource()).thenReturn(r);
|
||||
when(mockContainer.getRMIdentifer()).thenReturn(SIMULATED_RM_IDENTIFIER);
|
||||
String user = "testing";
|
||||
String host = "127.0.0.1";
|
||||
int port = 1234;
|
||||
ContainerToken containerToken =
|
||||
BuilderUtils.newContainerToken(cID, host, port, user, r,
|
||||
System.currentTimeMillis() + 10000L, 123, "password".getBytes());
|
||||
System.currentTimeMillis() + 10000L, 123, "password".getBytes(),
|
||||
SIMULATED_RM_IDENTIFIER);
|
||||
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
||||
StartContainerRequest request =
|
||||
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
|
@ -137,9 +138,11 @@ public class TestNodeManagerReboot {
|
|||
resource.setMemory(1024);
|
||||
mockContainer.setResource(resource);
|
||||
NodeId nodeId = BuilderUtils.newNodeId("127.0.0.1", 12345);
|
||||
mockContainer.setContainerToken(nm.getNMContext()
|
||||
.getContainerTokenSecretManager()
|
||||
.createContainerToken(cId, nodeId, user, resource));
|
||||
ContainerToken containerToken =
|
||||
BuilderUtils.newContainerToken(cId, nodeId.getHost(), nodeId.getPort(),
|
||||
user, resource, System.currentTimeMillis() + 10000L, 123,
|
||||
"password".getBytes(), 0);
|
||||
mockContainer.setContainerToken(containerToken);
|
||||
mockContainer.setNodeHttpAddress("127.0.0.1");
|
||||
mockContainer.setNodeId(nodeId);
|
||||
|
||||
|
|
|
@ -282,7 +282,7 @@ public class TestNodeManagerResync {
|
|||
while (!isStopped && numContainers < 10) {
|
||||
ContainerId cId = TestNodeManagerShutdown.createContainerId();
|
||||
Container container =
|
||||
BuilderUtils.newContainer(cId, null, null, null, null, null, 0);
|
||||
BuilderUtils.newContainer(cId, null, null, null, null, null);
|
||||
StartContainerRequest startRequest =
|
||||
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
|
|
|
@ -185,8 +185,11 @@ public class TestNodeManagerShutdown {
|
|||
containerLaunchContext.setCommands(commands);
|
||||
Resource resource = BuilderUtils.newResource(1024, 1);
|
||||
mockContainer.setResource(resource);
|
||||
mockContainer.setContainerToken(getContainerToken(nm, cId, nodeId,
|
||||
cId.toString(), resource));
|
||||
ContainerToken containerToken =
|
||||
BuilderUtils.newContainerToken(cId, nodeId.getHost(), nodeId.getPort(),
|
||||
user, resource, System.currentTimeMillis() + 10000L, 123,
|
||||
"password".getBytes(), 0);
|
||||
mockContainer.setContainerToken(containerToken);
|
||||
StartContainerRequest startRequest =
|
||||
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
|
@ -270,12 +273,6 @@ public class TestNodeManagerShutdown {
|
|||
return scriptFile;
|
||||
}
|
||||
|
||||
public static ContainerToken getContainerToken(NodeManager nm,
|
||||
ContainerId containerId, NodeId nodeId, String user, Resource resource) {
|
||||
return nm.getNMContext().getContainerTokenSecretManager()
|
||||
.createContainerToken(containerId, nodeId, user, resource);
|
||||
}
|
||||
|
||||
class TestNodeManager extends NodeManager {
|
||||
|
||||
@Override
|
||||
|
|
|
@ -124,8 +124,6 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|||
fileWriter.write("Hello World!");
|
||||
fileWriter.close();
|
||||
|
||||
ContainerLaunchContext container = recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
|
||||
// ////// Construct the Container-id
|
||||
ContainerId cId = createContainerId();
|
||||
|
||||
|
@ -155,11 +153,10 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|||
int port = 12345;
|
||||
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
||||
context.getNodeId().getHost() + ":" + port);
|
||||
when(mockContainer.getRMIdentifer()).thenReturn(super.DUMMY_RM_IDENTIFIER);
|
||||
ContainerToken containerToken =
|
||||
BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
|
||||
port, user, r, System.currentTimeMillis() + 10000L, 123,
|
||||
"password".getBytes());
|
||||
"password".getBytes(), super.DUMMY_RM_IDENTIFIER);
|
||||
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
||||
StartContainerRequest startRequest =
|
||||
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
|
@ -256,11 +253,10 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|||
int port = 12345;
|
||||
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
||||
context.getNodeId().getHost() + ":" + port);
|
||||
when(mockContainer.getRMIdentifer()).thenReturn(super.DUMMY_RM_IDENTIFIER);
|
||||
ContainerToken containerToken =
|
||||
BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
|
||||
port, user, r, System.currentTimeMillis() + 10000L, 123,
|
||||
"password".getBytes());
|
||||
"password".getBytes(), super.DUMMY_RM_IDENTIFIER);
|
||||
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
||||
|
||||
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
|
@ -372,11 +368,10 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|||
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
|
||||
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
||||
context.getNodeId().getHost() + ":" + port);
|
||||
when(mockContainer.getRMIdentifer()).thenReturn(super.DUMMY_RM_IDENTIFIER);
|
||||
ContainerToken containerToken =
|
||||
BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
|
||||
port, user, r, System.currentTimeMillis() + 10000L, 123,
|
||||
"password".getBytes());
|
||||
"password".getBytes(), super.DUMMY_RM_IDENTIFIER);
|
||||
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
||||
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
|
@ -438,8 +433,6 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|||
fileWriter.write("Hello World!");
|
||||
fileWriter.close();
|
||||
|
||||
ContainerLaunchContext container = recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
|
||||
// ////// Construct the Container-id
|
||||
ContainerId cId = createContainerId();
|
||||
ApplicationId appId = cId.getApplicationAttemptId().getApplicationId();
|
||||
|
@ -470,13 +463,12 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|||
int port = 12345;
|
||||
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
||||
context.getNodeId().getHost() + ":" + port);
|
||||
when(mockContainer.getRMIdentifer()).thenReturn(super.DUMMY_RM_IDENTIFIER);
|
||||
|
||||
// containerLaunchContext.command = new ArrayList<CharSequence>();
|
||||
ContainerToken containerToken =
|
||||
BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
|
||||
port, user, r, System.currentTimeMillis() + 10000L, 123,
|
||||
"password".getBytes());
|
||||
"password".getBytes(), super.DUMMY_RM_IDENTIFIER);
|
||||
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
||||
StartContainerRequest request = recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
request.setContainerLaunchContext(containerLaunchContext);
|
||||
|
@ -562,15 +554,14 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|||
Container mockContainer1 = mock(Container.class);
|
||||
when(mockContainer1.getId()).thenReturn(cId1);
|
||||
// Construct the Container with Invalid RMIdentifier
|
||||
when(mockContainer1.getRMIdentifer()).thenReturn(
|
||||
(long) ResourceManagerConstants.RM_INVALID_IDENTIFIER);
|
||||
StartContainerRequest startRequest1 =
|
||||
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
startRequest1.setContainerLaunchContext(containerLaunchContext);
|
||||
|
||||
ContainerToken containerToken1 =
|
||||
BuilderUtils.newContainerToken(cId1, host, port, user, mockResource,
|
||||
System.currentTimeMillis() + 10000, 123, "password".getBytes());
|
||||
System.currentTimeMillis() + 10000, 123, "password".getBytes(),
|
||||
(long) ResourceManagerConstants.RM_INVALID_IDENTIFIER);
|
||||
when(mockContainer1.getContainerToken()).thenReturn(containerToken1);
|
||||
startRequest1.setContainer(mockContainer1);
|
||||
boolean catchException = false;
|
||||
|
@ -592,7 +583,6 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|||
// Construct the Container with a RMIdentifier within current RM
|
||||
Container mockContainer2 = mock(Container.class);
|
||||
when(mockContainer2.getId()).thenReturn(cId2);
|
||||
when(mockContainer2.getRMIdentifer()).thenReturn(super.DUMMY_RM_IDENTIFIER);
|
||||
|
||||
when(mockContainer2.getResource()).thenReturn(mockResource);
|
||||
StartContainerRequest startRequest2 =
|
||||
|
@ -600,7 +590,8 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|||
startRequest2.setContainerLaunchContext(containerLaunchContext);
|
||||
ContainerToken containerToken2 =
|
||||
BuilderUtils.newContainerToken(cId1, host, port, user, mockResource,
|
||||
System.currentTimeMillis() + 10000, 123, "password".getBytes());
|
||||
System.currentTimeMillis() + 10000, 123, "password".getBytes(),
|
||||
super.DUMMY_RM_IDENTIFIER);
|
||||
when(mockContainer2.getContainerToken()).thenReturn(containerToken2);
|
||||
|
||||
startRequest2.setContainer(mockContainer2);
|
||||
|
|
|
@ -485,9 +485,10 @@ public class TestApplication {
|
|||
for (int i = 0; i < numContainers; i++) {
|
||||
Container container = createMockedContainer(this.appId, i);
|
||||
containers.add(container);
|
||||
long currentTime = System.currentTimeMillis();
|
||||
context.getContainerTokenSecretManager().startContainerSuccessful(
|
||||
new ContainerTokenIdentifier(container.getContainer().getId(), "",
|
||||
"", null, System.currentTimeMillis() + 1000, masterKey.getKeyId()));
|
||||
"", null, currentTime + 1000, masterKey.getKeyId(), currentTime));
|
||||
Assert.assertFalse(context.getContainerTokenSecretManager()
|
||||
.isValidStartContainerRequest(container.getContainer().getId()));
|
||||
}
|
||||
|
|
|
@ -548,7 +548,6 @@ public class TestContainer {
|
|||
final Container c;
|
||||
final Map<String, LocalResource> localResources;
|
||||
final Map<String, ByteBuffer> serviceData;
|
||||
final String user;
|
||||
|
||||
WrappedContainer(int appId, long timestamp, int id, String user)
|
||||
throws IOException {
|
||||
|
@ -572,7 +571,6 @@ public class TestContainer {
|
|||
dispatcher.register(AuxServicesEventType.class, auxBus);
|
||||
dispatcher.register(ApplicationEventType.class, appBus);
|
||||
dispatcher.register(LogHandlerEventType.class, LogBus);
|
||||
this.user = user;
|
||||
|
||||
ctxt = mock(ContainerLaunchContext.class);
|
||||
org.apache.hadoop.yarn.api.records.Container mockContainer =
|
||||
|
@ -584,9 +582,10 @@ public class TestContainer {
|
|||
when(mockContainer.getResource()).thenReturn(resource);
|
||||
String host = "127.0.0.1";
|
||||
int port = 1234;
|
||||
long currentTime = System.currentTimeMillis();
|
||||
ContainerTokenIdentifier identifier =
|
||||
new ContainerTokenIdentifier(cId, "127.0.0.1", user, resource,
|
||||
System.currentTimeMillis() + 10000L, 123);
|
||||
currentTime + 10000L, 123, currentTime);
|
||||
ContainerToken token =
|
||||
BuilderUtils.newContainerToken(BuilderUtils.newNodeId(host, port),
|
||||
"password".getBytes(), identifier);
|
||||
|
|
|
@ -181,7 +181,6 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
|
|||
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
|
||||
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
||||
context.getNodeId().getHost() + ":" + port);
|
||||
when(mockContainer.getRMIdentifer()).thenReturn(super.DUMMY_RM_IDENTIFIER);
|
||||
|
||||
Map<String, String> userSetEnv = new HashMap<String, String>();
|
||||
userSetEnv.put(Environment.CONTAINER_ID.name(), "user_set_container_id");
|
||||
|
@ -251,7 +250,7 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
|
|||
ContainerToken containerToken =
|
||||
BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
|
||||
port, user, r, System.currentTimeMillis() + 10000L, 1234,
|
||||
"password".getBytes());
|
||||
"password".getBytes(), super.DUMMY_RM_IDENTIFIER);
|
||||
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
||||
startRequest.setContainer(mockContainer);
|
||||
containerManager.startContainer(startRequest);
|
||||
|
@ -382,7 +381,6 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
|
|||
int port = 12345;
|
||||
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
||||
context.getNodeId().getHost() + ":" + port);
|
||||
when(mockContainer.getRMIdentifer()).thenReturn(super.DUMMY_RM_IDENTIFIER);
|
||||
|
||||
// upload the script file so that the container can run it
|
||||
URL resource_alpha =
|
||||
|
@ -409,7 +407,7 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
|
|||
ContainerToken containerToken =
|
||||
BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
|
||||
port, user, r, System.currentTimeMillis() + 10000L, 123,
|
||||
"password".getBytes());
|
||||
"password".getBytes(), super.DUMMY_RM_IDENTIFIER);
|
||||
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
||||
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
|
|
|
@ -48,15 +48,13 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||
import org.apache.hadoop.io.DataInputBuffer;
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.YarnException;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
|
@ -74,10 +72,9 @@ import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
|||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
|
@ -721,7 +718,6 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
|||
ContainerId cId = BuilderUtils.newContainerId(appAttemptId, 0);
|
||||
|
||||
when(mockContainer.getId()).thenReturn(cId);
|
||||
when(mockContainer.getRMIdentifer()).thenReturn(super.DUMMY_RM_IDENTIFIER);
|
||||
|
||||
URL resource_alpha =
|
||||
ConverterUtils.getYarnUrlFromPath(localFS
|
||||
|
@ -746,7 +742,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
|||
when(mockContainer.getResource()).thenReturn(r);
|
||||
when(mockContainer.getContainerToken()).thenReturn(
|
||||
BuilderUtils.newContainerToken(cId, "127.0.0.1", 1234, user, r,
|
||||
System.currentTimeMillis() + 10000L, 123, "password".getBytes()));
|
||||
System.currentTimeMillis() + 10000L, 123, "password".getBytes(),
|
||||
super.DUMMY_RM_IDENTIFIER));
|
||||
StartContainerRequest startRequest =
|
||||
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
|
|
|
@ -21,7 +21,8 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.*;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
|
@ -220,8 +221,6 @@ public class TestContainersMonitor extends BaseContainerManagerTest {
|
|||
int port = 12345;
|
||||
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
||||
context.getNodeId().getHost() + ":" + port);
|
||||
when(mockContainer.getRMIdentifer()).thenReturn(
|
||||
super.DUMMY_RM_IDENTIFIER);
|
||||
|
||||
URL resource_alpha =
|
||||
ConverterUtils.getYarnUrlFromPath(localFS
|
||||
|
@ -250,7 +249,7 @@ public class TestContainersMonitor extends BaseContainerManagerTest {
|
|||
ContainerToken containerToken =
|
||||
BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
|
||||
port, user, r, System.currentTimeMillis() + 10000L, 123,
|
||||
"password".getBytes());
|
||||
"password".getBytes(), super.DUMMY_RM_IDENTIFIER);
|
||||
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
||||
startRequest.setContainer(mockContainer);
|
||||
containerManager.startContainer(startRequest);
|
||||
|
|
|
@ -64,10 +64,11 @@ public class MockContainer implements Container {
|
|||
uniqId);
|
||||
this.launchContext = recordFactory
|
||||
.newRecordInstance(ContainerLaunchContext.class);
|
||||
long currentTime = System.currentTimeMillis();
|
||||
ContainerToken containerToken =
|
||||
BuilderUtils.newContainerToken(id, "127.0.0.1", 1234, user,
|
||||
BuilderUtils.newResource(1024, 1),
|
||||
System.currentTimeMillis() + 10000, 123, "password".getBytes());
|
||||
BuilderUtils.newResource(1024, 1), currentTime + 10000, 123,
|
||||
"password".getBytes(), currentTime);
|
||||
this.state = ContainerState.NEW;
|
||||
|
||||
mockContainer = mock(org.apache.hadoop.yarn.api.records.Container.class);
|
||||
|
|
|
@ -34,7 +34,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
|
@ -183,10 +182,11 @@ public class TestNMWebServer {
|
|||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
org.apache.hadoop.yarn.api.records.Container mockContainer =
|
||||
mock(org.apache.hadoop.yarn.api.records.Container.class);
|
||||
long currentTime = System.currentTimeMillis();
|
||||
ContainerToken containerToken =
|
||||
BuilderUtils.newContainerToken(containerId, "127.0.0.1", 1234, user,
|
||||
BuilderUtils.newResource(1024, 1),
|
||||
System.currentTimeMillis() + 10000L, 123, "password".getBytes());
|
||||
BuilderUtils.newResource(1024, 1), currentTime + 10000L, 123,
|
||||
"password".getBytes(), currentTime);
|
||||
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
||||
when(mockContainer.getId()).thenReturn(containerId);
|
||||
Container container =
|
||||
|
|
|
@ -53,7 +53,6 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
|||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
|
@ -1242,9 +1241,9 @@ public class LeafQueue implements CSQueue {
|
|||
.getApplicationAttemptId(), application.getNewContainerId());
|
||||
|
||||
// Create the container
|
||||
Container container = BuilderUtils.newContainer(containerId, nodeId,
|
||||
node.getRMNode().getHttpAddress(), capability, priority,
|
||||
null, ResourceManager.clusterTimeStamp);
|
||||
Container container =
|
||||
BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
|
||||
.getHttpAddress(), capability, priority, null);
|
||||
|
||||
return container;
|
||||
}
|
||||
|
|
|
@ -34,7 +34,6 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
|||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
|
@ -160,15 +159,15 @@ public class AppSchedulable extends Schedulable {
|
|||
.getApplicationAttemptId(), application.getNewContainerId());
|
||||
ContainerToken containerToken =
|
||||
containerTokenSecretManager.createContainerToken(containerId, nodeId,
|
||||
application.getUser(), capability);
|
||||
application.getUser(), capability);
|
||||
if (containerToken == null) {
|
||||
return null; // Try again later.
|
||||
}
|
||||
|
||||
// Create the container
|
||||
Container container = BuilderUtils.newContainer(containerId, nodeId,
|
||||
node.getRMNode().getHttpAddress(), capability, priority,
|
||||
containerToken, ResourceManager.clusterTimeStamp);
|
||||
Container container =
|
||||
BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
|
||||
.getHttpAddress(), capability, priority, containerToken);
|
||||
|
||||
return container;
|
||||
}
|
||||
|
|
|
@ -56,7 +56,6 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
|
||||
|
@ -554,16 +553,16 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
|
|||
|
||||
containerToken =
|
||||
this.rmContext.getContainerTokenSecretManager()
|
||||
.createContainerToken(containerId, nodeId,
|
||||
application.getUser(), capability);
|
||||
.createContainerToken(containerId, nodeId, application.getUser(),
|
||||
capability);
|
||||
if (containerToken == null) {
|
||||
return i; // Try again later.
|
||||
}
|
||||
|
||||
// Create the container
|
||||
Container container = BuilderUtils.newContainer(containerId, nodeId,
|
||||
node.getRMNode().getHttpAddress(), capability, priority,
|
||||
containerToken, ResourceManager.clusterTimeStamp);
|
||||
Container container =
|
||||
BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
|
||||
.getHttpAddress(), capability, priority, containerToken);
|
||||
|
||||
// Allocate!
|
||||
|
||||
|
|
|
@ -25,9 +25,16 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
|
||||
/**
|
||||
* SecretManager for ContainerTokens. This is RM-specific and rolls the
|
||||
|
@ -153,4 +160,37 @@ public class RMContainerTokenSecretManager extends
|
|||
activateNextMasterKey();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper function for creating ContainerTokens
|
||||
*
|
||||
* @param containerId
|
||||
* @param nodeId
|
||||
* @param appSubmitter
|
||||
* @param capability
|
||||
* @return the container-token
|
||||
*/
|
||||
public ContainerToken
|
||||
createContainerToken(ContainerId containerId, NodeId nodeId,
|
||||
String appSubmitter, Resource capability) {
|
||||
byte[] password;
|
||||
ContainerTokenIdentifier tokenIdentifier;
|
||||
long expiryTimeStamp =
|
||||
System.currentTimeMillis() + containerTokenExpiryInterval;
|
||||
|
||||
// Lock so that we use the same MasterKey's keyId and its bytes
|
||||
this.readLock.lock();
|
||||
try {
|
||||
tokenIdentifier =
|
||||
new ContainerTokenIdentifier(containerId, nodeId.toString(),
|
||||
appSubmitter, capability, expiryTimeStamp, this.currentMasterKey
|
||||
.getMasterKey().getKeyId(), ResourceManager.clusterTimeStamp);
|
||||
password = this.createPassword(tokenIdentifier);
|
||||
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
|
||||
return BuilderUtils.newContainerToken(nodeId, password, tokenIdentifier);
|
||||
}
|
||||
}
|
|
@ -188,7 +188,6 @@ public class NodeManager implements ContainerManager {
|
|||
this.nodeId, nodeHttpAddress,
|
||||
requestContainer.getResource(),
|
||||
null, null // DKDC - Doesn't matter
|
||||
, 0
|
||||
);
|
||||
|
||||
ContainerStatus containerStatus =
|
||||
|
|
|
@ -23,7 +23,9 @@ import junit.framework.Assert;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
|
@ -32,6 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -77,8 +80,12 @@ public class TestApplicationMasterService {
|
|||
}
|
||||
|
||||
// assert RMIdentifer is set properly in allocated containers
|
||||
Assert.assertEquals(rm.clusterTimeStamp, alloc1Response
|
||||
.getAllocatedContainers().get(0).getRMIdentifer());
|
||||
Container allocatedContainer =
|
||||
alloc1Response.getAllocatedContainers().get(0);
|
||||
ContainerTokenIdentifier tokenId =
|
||||
BuilderUtils.newContainerTokenIdentifier(allocatedContainer
|
||||
.getContainerToken());
|
||||
Assert.assertEquals(MockRM.clusterTimeStamp, tokenId.getRMIdentifer());
|
||||
rm.stop();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -69,7 +69,7 @@ public class TestRMContainerImpl {
|
|||
Priority priority = BuilderUtils.newPriority(5);
|
||||
|
||||
Container container = BuilderUtils.newContainer(containerId, nodeId,
|
||||
"host:3465", resource, priority, null, 0);
|
||||
"host:3465", resource, priority, null);
|
||||
|
||||
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
|
||||
nodeId, eventHandler, expirer);
|
||||
|
@ -139,7 +139,7 @@ public class TestRMContainerImpl {
|
|||
Priority priority = BuilderUtils.newPriority(5);
|
||||
|
||||
Container container = BuilderUtils.newContainer(containerId, nodeId,
|
||||
"host:3465", resource, priority, null, 0);
|
||||
"host:3465", resource, priority, null);
|
||||
|
||||
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
|
||||
nodeId, eventHandler, expirer);
|
||||
|
|
|
@ -223,7 +223,8 @@ public class TestContainerManagerSecurity {
|
|||
ContainerTokenIdentifier modifiedIdentifier =
|
||||
new ContainerTokenIdentifier(dummyIdentifier.getContainerID(),
|
||||
dummyIdentifier.getNmHostAddress(), "testUser", modifiedResource,
|
||||
Long.MAX_VALUE, dummyIdentifier.getMasterKeyId());
|
||||
Long.MAX_VALUE, dummyIdentifier.getMasterKeyId(),
|
||||
ResourceManager.clusterTimeStamp);
|
||||
Token<ContainerTokenIdentifier> modifiedToken = new Token<ContainerTokenIdentifier>(
|
||||
modifiedIdentifier.getBytes(), containerToken.getPassword().array(),
|
||||
new Text(containerToken.getKind()), new Text(containerToken
|
||||
|
@ -340,10 +341,10 @@ public class TestContainerManagerSecurity {
|
|||
resourceManager.getRMContainerTokenSecretManager();
|
||||
final ContainerTokenIdentifier newTokenId =
|
||||
new ContainerTokenIdentifier(tokenId.getContainerID(),
|
||||
tokenId.getNmHostAddress(), tokenId.getApplicationSubmitter(),
|
||||
tokenId.getResource(),
|
||||
System.currentTimeMillis() - 1,
|
||||
containerTokenSecreteManager.getCurrentKey().getKeyId());
|
||||
tokenId.getNmHostAddress(), tokenId.getApplicationSubmitter(),
|
||||
tokenId.getResource(), System.currentTimeMillis() - 1,
|
||||
containerTokenSecreteManager.getCurrentKey().getKeyId(),
|
||||
ResourceManager.clusterTimeStamp);
|
||||
final byte[] passowrd =
|
||||
containerTokenSecreteManager.createPassword(
|
||||
newTokenId);
|
||||
|
|
Loading…
Reference in New Issue