YARN-684. ContainerManager.startContainer should use ContainerTokenIdentifier instead of the entire Container. Contributed by Vinod Kumar Vavilapalli.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1488085 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
14da7b7628
commit
2692675fc3
|
@ -74,6 +74,10 @@ Release 2.0.5-beta - UNRELEASED
|
|||
|
||||
YARN-716. Making ApplicationID immutable. (Siddharth Seth via vinodkv)
|
||||
|
||||
YARN-684. ContainerManager.startContainer should use
|
||||
ContainerTokenIdentifier instead of the entire Container.
|
||||
(Vinod Kumar Vavilapalli via sseth)
|
||||
|
||||
NEW FEATURES
|
||||
|
||||
YARN-482. FS: Extend SchedulingMode to intermediate queues.
|
||||
|
|
|
@ -21,8 +21,8 @@ package org.apache.hadoop.yarn.api.protocolrecords;
|
|||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||
import org.apache.hadoop.yarn.api.ContainerManager;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||
|
||||
/**
|
||||
* <p>The request sent by the <code>ApplicationMaster</code> to the
|
||||
|
@ -62,9 +62,9 @@ public interface StartContainerRequest {
|
|||
|
||||
@Public
|
||||
@Stable
|
||||
public Container getContainer();
|
||||
public ContainerToken getContainerToken();
|
||||
|
||||
@Public
|
||||
@Stable
|
||||
public void setContainer(Container container);
|
||||
public void setContainerToken(ContainerToken container);
|
||||
}
|
||||
|
|
|
@ -19,14 +19,14 @@
|
|||
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
|
||||
|
||||
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||
import org.apache.hadoop.yarn.api.records.ProtoBase;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerLaunchContextPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerTokenPBImpl;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProtoOrBuilder;
|
||||
|
||||
|
@ -39,7 +39,7 @@ public class StartContainerRequestPBImpl extends ProtoBase<StartContainerRequest
|
|||
|
||||
private ContainerLaunchContext containerLaunchContext = null;
|
||||
|
||||
private Container container = null;
|
||||
private ContainerToken containerToken = null;
|
||||
|
||||
public StartContainerRequestPBImpl() {
|
||||
builder = StartContainerRequestProto.newBuilder();
|
||||
|
@ -61,8 +61,8 @@ public class StartContainerRequestPBImpl extends ProtoBase<StartContainerRequest
|
|||
if (this.containerLaunchContext != null) {
|
||||
builder.setContainerLaunchContext(convertToProtoFormat(this.containerLaunchContext));
|
||||
}
|
||||
if(this.container != null) {
|
||||
builder.setContainer(convertToProtoFormat(this.container));
|
||||
if(this.containerToken != null) {
|
||||
builder.setContainerToken(convertToProtoFormat(this.containerToken));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -104,25 +104,25 @@ public class StartContainerRequestPBImpl extends ProtoBase<StartContainerRequest
|
|||
}
|
||||
|
||||
@Override
|
||||
public Container getContainer() {
|
||||
public ContainerToken getContainerToken() {
|
||||
StartContainerRequestProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (this.container != null) {
|
||||
return this.container;
|
||||
if (this.containerToken != null) {
|
||||
return this.containerToken;
|
||||
}
|
||||
if (!p.hasContainer()) {
|
||||
if (!p.hasContainerToken()) {
|
||||
return null;
|
||||
}
|
||||
this.container = convertFromProtoFormat(p.getContainer());
|
||||
return this.container;
|
||||
this.containerToken = convertFromProtoFormat(p.getContainerToken());
|
||||
return this.containerToken;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setContainer(Container container) {
|
||||
public void setContainerToken(ContainerToken containerToken) {
|
||||
maybeInitBuilder();
|
||||
if(container == null) {
|
||||
builder.clearContainer();
|
||||
if(containerToken == null) {
|
||||
builder.clearContainerToken();
|
||||
}
|
||||
this.container = container;
|
||||
this.containerToken = containerToken;
|
||||
}
|
||||
|
||||
private ContainerLaunchContextPBImpl convertFromProtoFormat(ContainerLaunchContextProto p) {
|
||||
|
@ -135,11 +135,11 @@ public class StartContainerRequestPBImpl extends ProtoBase<StartContainerRequest
|
|||
|
||||
|
||||
|
||||
private ContainerPBImpl convertFromProtoFormat(ContainerProto containerProto) {
|
||||
return new ContainerPBImpl(containerProto);
|
||||
private ContainerTokenPBImpl convertFromProtoFormat(TokenProto containerProto) {
|
||||
return new ContainerTokenPBImpl(containerProto);
|
||||
}
|
||||
|
||||
private ContainerProto convertToProtoFormat(Container container) {
|
||||
return ((ContainerPBImpl)container).getProto();
|
||||
private TokenProto convertToProtoFormat(ContainerToken container) {
|
||||
return ((ContainerTokenPBImpl)container).getProto();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -173,7 +173,7 @@ message GetQueueUserAclsInfoResponseProto {
|
|||
|
||||
message StartContainerRequestProto {
|
||||
optional ContainerLaunchContextProto container_launch_context = 1;
|
||||
optional ContainerProto container = 2;
|
||||
optional hadoop.common.TokenProto container_token = 2;
|
||||
}
|
||||
|
||||
message StartContainerResponseProto {
|
||||
|
|
|
@ -763,7 +763,7 @@ public class ApplicationMaster {
|
|||
StartContainerRequest startReq = Records
|
||||
.newRecord(StartContainerRequest.class);
|
||||
startReq.setContainerLaunchContext(ctx);
|
||||
startReq.setContainer(container);
|
||||
startReq.setContainerToken(container.getContainerToken());
|
||||
try {
|
||||
cm.startContainer(startReq);
|
||||
} catch (YarnRemoteException e) {
|
||||
|
|
|
@ -212,7 +212,7 @@ public class NMClientImpl extends AbstractService implements NMClient {
|
|||
try {
|
||||
StartContainerRequest startRequest =
|
||||
Records.newRecord(StartContainerRequest.class);
|
||||
startRequest.setContainer(container);
|
||||
startRequest.setContainerToken(container.getContainerToken());
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
startResponse = containerManager.startContainer(startRequest);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
|
|
@ -39,11 +39,11 @@ import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
|
@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
|
|||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -89,23 +90,25 @@ public class TestContainerLaunchRPC {
|
|||
server.getListenerAddress(), conf);
|
||||
ContainerLaunchContext containerLaunchContext = recordFactory
|
||||
.newRecordInstance(ContainerLaunchContext.class);
|
||||
ContainerId containerId = recordFactory
|
||||
.newRecordInstance(ContainerId.class);
|
||||
|
||||
ApplicationId applicationId = ApplicationId.newInstance(0, 0);
|
||||
ApplicationAttemptId applicationAttemptId = recordFactory
|
||||
.newRecordInstance(ApplicationAttemptId.class);
|
||||
applicationAttemptId.setApplicationId(applicationId);
|
||||
applicationAttemptId.setAttemptId(0);
|
||||
containerId.setApplicationAttemptId(applicationAttemptId);
|
||||
containerId.setId(100);
|
||||
Container container =
|
||||
BuilderUtils.newContainer(containerId, null, null, recordFactory
|
||||
.newRecordInstance(Resource.class), null, null);
|
||||
ApplicationAttemptId applicationAttemptId =
|
||||
ApplicationAttemptId.newInstance(applicationId, 0);
|
||||
ContainerId containerId =
|
||||
ContainerId.newInstance(applicationAttemptId, 100);
|
||||
NodeId nodeId = NodeId.newInstance("localhost", 1234);
|
||||
Resource resource = Resource.newInstance(1234, 2);
|
||||
ContainerTokenIdentifier containerTokenIdentifier =
|
||||
new ContainerTokenIdentifier(containerId, "localhost", "user",
|
||||
resource, System.currentTimeMillis() + 10000, 42, 42);
|
||||
ContainerToken containerToken =
|
||||
BuilderUtils.newContainerToken(nodeId, "password".getBytes(),
|
||||
containerTokenIdentifier);
|
||||
|
||||
StartContainerRequest scRequest = recordFactory
|
||||
.newRecordInstance(StartContainerRequest.class);
|
||||
scRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
scRequest.setContainer(container);
|
||||
scRequest.setContainerToken(containerToken);
|
||||
try {
|
||||
proxy.startContainer(scRequest);
|
||||
} catch (Exception e) {
|
||||
|
@ -138,9 +141,6 @@ public class TestContainerLaunchRPC {
|
|||
@Override
|
||||
public StartContainerResponse startContainer(StartContainerRequest request)
|
||||
throws YarnRemoteException, IOException {
|
||||
StartContainerResponse response = recordFactory
|
||||
.newRecordInstance(StartContainerResponse.class);
|
||||
status = recordFactory.newRecordInstance(ContainerStatus.class);
|
||||
try {
|
||||
// make the thread sleep to look like its not going to respond
|
||||
Thread.sleep(10000);
|
||||
|
@ -148,10 +148,7 @@ public class TestContainerLaunchRPC {
|
|||
LOG.error(e);
|
||||
throw new YarnRemoteException(e);
|
||||
}
|
||||
status.setState(ContainerState.RUNNING);
|
||||
status.setContainerId(request.getContainer().getId());
|
||||
status.setExitStatus(0);
|
||||
return response;
|
||||
throw new YarnRemoteException("Shouldn't happen!!");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.yarn;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
@ -39,18 +40,21 @@ import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
|
||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.Test;
|
||||
|
@ -111,30 +115,29 @@ public class TestRPC {
|
|||
NetUtils.getConnectAddress(server), conf);
|
||||
ContainerLaunchContext containerLaunchContext =
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
ContainerId containerId =
|
||||
recordFactory.newRecordInstance(ContainerId.class);
|
||||
|
||||
ApplicationId applicationId = ApplicationId.newInstance(0, 0);
|
||||
ApplicationAttemptId applicationAttemptId =
|
||||
recordFactory.newRecordInstance(ApplicationAttemptId.class);
|
||||
applicationAttemptId.setApplicationId(applicationId);
|
||||
applicationAttemptId.setAttemptId(0);
|
||||
containerId.setApplicationAttemptId(applicationAttemptId);
|
||||
containerId.setId(100);
|
||||
Container mockContainer =
|
||||
BuilderUtils.newContainer(containerId, null, null, recordFactory
|
||||
.newRecordInstance(Resource.class), null, null);
|
||||
// containerLaunchContext.env = new HashMap<CharSequence, CharSequence>();
|
||||
// containerLaunchContext.command = new ArrayList<CharSequence>();
|
||||
|
||||
ApplicationAttemptId.newInstance(applicationId, 0);
|
||||
ContainerId containerId =
|
||||
ContainerId.newInstance(applicationAttemptId, 100);
|
||||
StartContainerRequest scRequest =
|
||||
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
scRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
scRequest.setContainer(mockContainer);
|
||||
NodeId nodeId = NodeId.newInstance("localhost", 1234);
|
||||
Resource resource = Resource.newInstance(1234, 2);
|
||||
ContainerTokenIdentifier containerTokenIdentifier =
|
||||
new ContainerTokenIdentifier(containerId, "localhost", "user",
|
||||
resource, System.currentTimeMillis() + 10000, 42, 42);
|
||||
ContainerToken containerToken =
|
||||
BuilderUtils.newContainerToken(nodeId, "password".getBytes(),
|
||||
containerTokenIdentifier);
|
||||
scRequest.setContainerToken(containerToken);
|
||||
proxy.startContainer(scRequest);
|
||||
|
||||
GetContainerStatusRequest gcsRequest =
|
||||
recordFactory.newRecordInstance(GetContainerStatusRequest.class);
|
||||
gcsRequest.setContainerId(mockContainer.getId());
|
||||
gcsRequest.setContainerId(containerId);
|
||||
GetContainerStatusResponse response = proxy.getContainerStatus(gcsRequest);
|
||||
ContainerStatus status = response.getStatus();
|
||||
|
||||
|
@ -142,7 +145,7 @@ public class TestRPC {
|
|||
boolean exception = false;
|
||||
try {
|
||||
StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class);
|
||||
stopRequest.setContainerId(mockContainer.getId());
|
||||
stopRequest.setContainerId(containerId);
|
||||
proxy.stopContainer(stopRequest);
|
||||
} catch (YarnRemoteException e) {
|
||||
exception = true;
|
||||
|
@ -176,11 +179,19 @@ public class TestRPC {
|
|||
@Override
|
||||
public StartContainerResponse startContainer(StartContainerRequest request)
|
||||
throws YarnRemoteException {
|
||||
ContainerToken containerToken = request.getContainerToken();
|
||||
ContainerTokenIdentifier tokenId = null;
|
||||
|
||||
try {
|
||||
tokenId = BuilderUtils.newContainerTokenIdentifier(containerToken);
|
||||
} catch (IOException e) {
|
||||
throw RPCUtil.getRemoteException(e);
|
||||
}
|
||||
StartContainerResponse response =
|
||||
recordFactory.newRecordInstance(StartContainerResponse.class);
|
||||
status = recordFactory.newRecordInstance(ContainerStatus.class);
|
||||
status.setState(ContainerState.RUNNING);
|
||||
status.setContainerId(request.getContainer().getId());
|
||||
status.setContainerId(tokenId.getContainerID());
|
||||
status.setExitStatus(0);
|
||||
return response;
|
||||
}
|
||||
|
|
|
@ -42,6 +42,13 @@ public interface Context {
|
|||
*/
|
||||
NodeId getNodeId();
|
||||
|
||||
/**
|
||||
* Return the node http-address. Usable only after the Webserver is started.
|
||||
*
|
||||
* @return the http-port
|
||||
*/
|
||||
int getHttpPort();
|
||||
|
||||
ConcurrentMap<ApplicationId, Application> getApplications();
|
||||
|
||||
ConcurrentMap<ContainerId, Container> getContainers();
|
||||
|
|
|
@ -113,7 +113,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
|||
List<String> localDirs, List<String> logDirs) throws IOException {
|
||||
|
||||
FsPermission dirPerm = new FsPermission(APPDIR_PERM);
|
||||
ContainerId containerId = container.getContainer().getId();
|
||||
ContainerId containerId = container.getContainerId();
|
||||
|
||||
// create container dirs on all disks
|
||||
String containerIdStr = ConverterUtils.toString(containerId);
|
||||
|
|
|
@ -216,11 +216,11 @@ public class LinuxContainerExecutor extends ContainerExecutor {
|
|||
String user, String appId, Path containerWorkDir,
|
||||
List<String> localDirs, List<String> logDirs) throws IOException {
|
||||
|
||||
ContainerId containerId = container.getContainer().getId();
|
||||
ContainerId containerId = container.getContainerId();
|
||||
String containerIdStr = ConverterUtils.toString(containerId);
|
||||
|
||||
resourcesHandler.preExecute(containerId,
|
||||
container.getContainer().getResource());
|
||||
container.getResource());
|
||||
String resourcesOptions = resourcesHandler.getResourcesOption(
|
||||
containerId);
|
||||
|
||||
|
|
|
@ -32,7 +32,6 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.ShutdownHookManager;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
@ -56,7 +55,6 @@ import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecret
|
|||
import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.service.CompositeService;
|
||||
import org.apache.hadoop.yarn.service.Service;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
@ -173,9 +171,10 @@ public class NodeManager extends CompositeService
|
|||
addService(containerManager);
|
||||
((NMContext) context).setContainerManager(containerManager);
|
||||
|
||||
Service webServer = createWebServer(context, containerManager
|
||||
WebServer webServer = createWebServer(context, containerManager
|
||||
.getContainersMonitor(), this.aclsManager, dirsHandler);
|
||||
addService(webServer);
|
||||
((NMContext) context).setWebServer(webServer);
|
||||
|
||||
dispatcher.register(ContainerManagerEventType.class, containerManager);
|
||||
dispatcher.register(NodeManagerEventType.class, this);
|
||||
|
@ -297,6 +296,7 @@ public class NodeManager extends CompositeService
|
|||
|
||||
private final NMContainerTokenSecretManager containerTokenSecretManager;
|
||||
private ContainerManager containerManager;
|
||||
private WebServer webServer;
|
||||
private final NodeHealthStatus nodeHealthStatus = RecordFactoryProvider
|
||||
.getRecordFactory(null).newRecordInstance(NodeHealthStatus.class);
|
||||
|
||||
|
@ -315,6 +315,11 @@ public class NodeManager extends CompositeService
|
|||
return this.nodeId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getHttpPort() {
|
||||
return this.webServer.getPort();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConcurrentMap<ApplicationId, Application> getApplications() {
|
||||
return this.applications;
|
||||
|
@ -342,6 +347,10 @@ public class NodeManager extends CompositeService
|
|||
public void setContainerManager(ContainerManager containerManager) {
|
||||
this.containerManager = containerManager;
|
||||
}
|
||||
|
||||
public void setWebServer(WebServer webServer) {
|
||||
this.webServer = webServer;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -352,12 +361,6 @@ public class NodeManager extends CompositeService
|
|||
return nodeHealthChecker;
|
||||
}
|
||||
|
||||
private void reboot() {
|
||||
LOG.info("Rebooting the node manager.");
|
||||
NodeManager nodeManager = createNewNodeManager();
|
||||
nodeManager.initAndStartNodeManager(this.getConfig(), true);
|
||||
}
|
||||
|
||||
private void initAndStartNodeManager(Configuration conf, boolean hasToReboot) {
|
||||
try {
|
||||
|
||||
|
|
|
@ -155,13 +155,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
|
||||
// NodeManager is the last service to start, so NodeId is available.
|
||||
this.nodeId = this.context.getNodeId();
|
||||
|
||||
InetSocketAddress httpBindAddress = getConfig().getSocketAddr(
|
||||
YarnConfiguration.NM_WEBAPP_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_NM_WEBAPP_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_NM_WEBAPP_PORT);
|
||||
this.httpPort = this.context.getHttpPort();
|
||||
try {
|
||||
this.httpPort = httpBindAddress.getPort();
|
||||
// Registration has to be in start so that ContainerManager can get the
|
||||
// perNM tokens needed to authenticate ContainerTokens.
|
||||
registerWithRM();
|
||||
|
|
|
@ -52,7 +52,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
|
@ -272,14 +272,13 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
}
|
||||
|
||||
// Get the remoteUGI corresponding to the api call.
|
||||
private UserGroupInformation getRemoteUgi(String containerIDStr)
|
||||
private UserGroupInformation getRemoteUgi()
|
||||
throws YarnRemoteException {
|
||||
UserGroupInformation remoteUgi;
|
||||
try {
|
||||
remoteUgi = UserGroupInformation.getCurrentUser();
|
||||
} catch (IOException e) {
|
||||
String msg = "Cannot obtain the user-name for containerId: "
|
||||
+ containerIDStr + ". Got exception: "
|
||||
String msg = "Cannot obtain the user-name. Got exception: "
|
||||
+ StringUtils.stringifyException(e);
|
||||
LOG.warn(msg);
|
||||
throw RPCUtil.getRemoteException(msg);
|
||||
|
@ -307,7 +306,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
@VisibleForTesting
|
||||
protected ContainerTokenIdentifier getContainerTokenIdentifier(
|
||||
UserGroupInformation remoteUgi,
|
||||
org.apache.hadoop.yarn.api.records.Container container)
|
||||
ContainerTokenIdentifier containerTokenIdentifier)
|
||||
throws YarnRemoteException {
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
@ -317,12 +316,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
// Get the tokenId from the remote user ugi
|
||||
return selectContainerTokenIdentifier(remoteUgi);
|
||||
} else {
|
||||
try {
|
||||
return BuilderUtils.newContainerTokenIdentifier(container
|
||||
.getContainerToken());
|
||||
} catch (IOException e) {
|
||||
throw RPCUtil.getRemoteException(e);
|
||||
}
|
||||
return containerTokenIdentifier;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -341,7 +335,6 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
@VisibleForTesting
|
||||
protected void authorizeRequest(String containerIDStr,
|
||||
ContainerLaunchContext launchContext,
|
||||
org.apache.hadoop.yarn.api.records.Container container,
|
||||
UserGroupInformation remoteUgi, ContainerTokenIdentifier tokenId)
|
||||
throws YarnRemoteException {
|
||||
|
||||
|
@ -380,13 +373,6 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
+ System.currentTimeMillis() + " found "
|
||||
+ tokenId.getExpiryTimeStamp());
|
||||
}
|
||||
|
||||
Resource resource = tokenId.getResource();
|
||||
if (resource == null || !resource.equals(container.getResource())) {
|
||||
unauthorized = true;
|
||||
messageBuilder.append("\nExpected resource " + resource
|
||||
+ " but found " + container.getResource());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -412,16 +398,23 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
}
|
||||
|
||||
ContainerLaunchContext launchContext = request.getContainerLaunchContext();
|
||||
org.apache.hadoop.yarn.api.records.Container lauchContainer =
|
||||
request.getContainer();
|
||||
ContainerId containerID = lauchContainer.getId();
|
||||
ContainerToken token = request.getContainerToken();
|
||||
|
||||
ContainerTokenIdentifier tokenIdentifier = null;
|
||||
try {
|
||||
tokenIdentifier = BuilderUtils.newContainerTokenIdentifier(token);
|
||||
} catch (IOException e) {
|
||||
throw RPCUtil.getRemoteException(e);
|
||||
}
|
||||
|
||||
UserGroupInformation remoteUgi = getRemoteUgi();
|
||||
ContainerTokenIdentifier tokenId =
|
||||
getContainerTokenIdentifier(remoteUgi, tokenIdentifier);
|
||||
|
||||
ContainerId containerID = tokenId.getContainerID();
|
||||
String containerIDStr = containerID.toString();
|
||||
|
||||
UserGroupInformation remoteUgi = getRemoteUgi(containerIDStr);
|
||||
ContainerTokenIdentifier tokenId =
|
||||
getContainerTokenIdentifier(remoteUgi, lauchContainer);
|
||||
authorizeRequest(containerIDStr, launchContext, lauchContainer, remoteUgi,
|
||||
tokenId);
|
||||
authorizeRequest(containerIDStr, launchContext, remoteUgi, tokenId);
|
||||
|
||||
// Is the container coming from unknown RM
|
||||
if (tokenId.getRMIdentifer() != nodeStatusUpdater
|
||||
|
@ -458,8 +451,9 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
// //////////// End of parsing credentials
|
||||
String user = tokenId.getApplicationSubmitter();
|
||||
|
||||
Container container = new ContainerImpl(getConfig(), this.dispatcher,
|
||||
launchContext, lauchContainer, credentials, metrics, tokenId);
|
||||
Container container =
|
||||
new ContainerImpl(getConfig(), this.dispatcher, launchContext,
|
||||
credentials, metrics, tokenId);
|
||||
ApplicationId applicationID =
|
||||
containerID.getApplicationAttemptId().getApplicationId();
|
||||
if (context.getContainers().putIfAbsent(containerID, container) != null) {
|
||||
|
@ -501,7 +495,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
// TODO launchedContainer misplaced -> doesn't necessarily mean a container
|
||||
// launch. A finished Application will not launch containers.
|
||||
metrics.launchedContainer();
|
||||
metrics.allocateContainer(lauchContainer.getResource());
|
||||
metrics.allocateContainer(tokenId.getResource());
|
||||
return response;
|
||||
}
|
||||
|
||||
|
@ -518,7 +512,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
|
||||
// TODO: Only the container's owner can kill containers today.
|
||||
|
||||
UserGroupInformation remoteUgi = getRemoteUgi(containerIDStr);
|
||||
UserGroupInformation remoteUgi = getRemoteUgi();
|
||||
Container container = this.context.getContainers().get(containerID);
|
||||
StopContainerResponse response =
|
||||
recordFactory.newRecordInstance(StopContainerResponse.class);
|
||||
|
@ -532,8 +526,8 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
containerID);
|
||||
return response; // Return immediately.
|
||||
}
|
||||
authorizeRequest(containerIDStr, null, null, remoteUgi,
|
||||
getContainerTokenIdentifier(remoteUgi, container.getContainer()));
|
||||
authorizeRequest(containerIDStr, null, remoteUgi,
|
||||
getContainerTokenIdentifier(remoteUgi, container.getContainerTokenIdentifier()));
|
||||
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ContainerKillEvent(containerID,
|
||||
|
@ -561,15 +555,15 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
|
||||
// TODO: Only the container's owner can get containers' status today.
|
||||
|
||||
UserGroupInformation remoteUgi = getRemoteUgi(containerIDStr);
|
||||
UserGroupInformation remoteUgi = getRemoteUgi();
|
||||
LOG.info("Getting container-status for " + containerIDStr);
|
||||
Container container = this.context.getContainers().get(containerID);
|
||||
if (container == null) {
|
||||
throw RPCUtil.getRemoteException("Container " + containerIDStr
|
||||
+ " is not handled by this NodeManager");
|
||||
}
|
||||
authorizeRequest(containerIDStr, null, null, remoteUgi,
|
||||
getContainerTokenIdentifier(remoteUgi, container.getContainer()));
|
||||
authorizeRequest(containerIDStr, null, remoteUgi,
|
||||
getContainerTokenIdentifier(remoteUgi, container.getContainerTokenIdentifier()));
|
||||
|
||||
ContainerStatus containerStatus = container.cloneAndGetContainerStatus();
|
||||
LOG.info("Returning " + containerStatus);
|
||||
|
|
|
@ -36,7 +36,7 @@ public class ApplicationContainerInitEvent extends ApplicationEvent {
|
|||
final Container container;
|
||||
|
||||
public ApplicationContainerInitEvent(Container container) {
|
||||
super(container.getContainer().getId().getApplicationAttemptId()
|
||||
super(container.getContainerId().getApplicationAttemptId()
|
||||
.getApplicationId(), ApplicationEventType.INIT_CONTAINER);
|
||||
this.container = container;
|
||||
}
|
||||
|
|
|
@ -273,14 +273,14 @@ public class ApplicationImpl implements Application {
|
|||
ApplicationContainerInitEvent initEvent =
|
||||
(ApplicationContainerInitEvent) event;
|
||||
Container container = initEvent.getContainer();
|
||||
app.containers.put(container.getContainer().getId(), container);
|
||||
LOG.info("Adding " + container.getContainer().getId()
|
||||
app.containers.put(container.getContainerId(), container);
|
||||
LOG.info("Adding " + container.getContainerId()
|
||||
+ " to application " + app.toString());
|
||||
|
||||
switch (app.getApplicationState()) {
|
||||
case RUNNING:
|
||||
app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
|
||||
container.getContainer().getId()));
|
||||
container.getContainerId()));
|
||||
break;
|
||||
case INITING:
|
||||
case NEW:
|
||||
|
@ -301,7 +301,7 @@ public class ApplicationImpl implements Application {
|
|||
// Start all the containers waiting for ApplicationInit
|
||||
for (Container container : app.containers.values()) {
|
||||
app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
|
||||
container.getContainer().getId()));
|
||||
container.getContainerId()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,13 +23,20 @@ import java.util.Map;
|
|||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
|
||||
public interface Container extends EventHandler<ContainerEvent> {
|
||||
|
||||
org.apache.hadoop.yarn.api.records.Container getContainer();
|
||||
ContainerId getContainerId();
|
||||
|
||||
Resource getResource();
|
||||
|
||||
ContainerTokenIdentifier getContainerTokenIdentifier();
|
||||
|
||||
String getUser();
|
||||
|
||||
|
|
|
@ -37,11 +37,13 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.yarn.api.ContainerExitStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
|
@ -76,7 +78,9 @@ public class ContainerImpl implements Container {
|
|||
private final Credentials credentials;
|
||||
private final NodeManagerMetrics metrics;
|
||||
private final ContainerLaunchContext launchContext;
|
||||
private final org.apache.hadoop.yarn.api.records.Container container;
|
||||
private final ContainerTokenIdentifier containerTokenIdentifier;
|
||||
private final ContainerId containerId;
|
||||
private final Resource resource;
|
||||
private final String user;
|
||||
private int exitCode = ContainerExitStatus.INVALID;
|
||||
private final StringBuilder diagnostics;
|
||||
|
@ -97,18 +101,19 @@ public class ContainerImpl implements Container {
|
|||
new ArrayList<LocalResourceRequest>();
|
||||
|
||||
public ContainerImpl(Configuration conf, Dispatcher dispatcher,
|
||||
ContainerLaunchContext launchContext,
|
||||
org.apache.hadoop.yarn.api.records.Container container,
|
||||
Credentials creds, NodeManagerMetrics metrics,
|
||||
ContainerTokenIdentifier identifier) throws IOException {
|
||||
ContainerLaunchContext launchContext, Credentials creds,
|
||||
NodeManagerMetrics metrics,
|
||||
ContainerTokenIdentifier containerTokenIdentifier) throws IOException {
|
||||
this.daemonConf = conf;
|
||||
this.dispatcher = dispatcher;
|
||||
this.launchContext = launchContext;
|
||||
this.container = container;
|
||||
this.containerTokenIdentifier = containerTokenIdentifier;
|
||||
this.containerId = containerTokenIdentifier.getContainerID();
|
||||
this.resource = containerTokenIdentifier.getResource();
|
||||
this.diagnostics = new StringBuilder();
|
||||
this.credentials = creds;
|
||||
this.metrics = metrics;
|
||||
user = identifier.getApplicationSubmitter();
|
||||
user = containerTokenIdentifier.getApplicationSubmitter();
|
||||
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
|
||||
this.readLock = readWriteLock.readLock();
|
||||
this.writeLock = readWriteLock.writeLock();
|
||||
|
@ -366,7 +371,7 @@ public class ContainerImpl implements Container {
|
|||
public ContainerStatus cloneAndGetContainerStatus() {
|
||||
this.readLock.lock();
|
||||
try {
|
||||
return BuilderUtils.newContainerStatus(this.container.getId(),
|
||||
return BuilderUtils.newContainerStatus(this.containerId,
|
||||
getCurrentState(), diagnostics.toString(), exitCode);
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
|
@ -374,10 +379,20 @@ public class ContainerImpl implements Container {
|
|||
}
|
||||
|
||||
@Override
|
||||
public org.apache.hadoop.yarn.api.records.Container getContainer() {
|
||||
public ContainerId getContainerId() {
|
||||
return this.containerId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource getResource() {
|
||||
return this.resource;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ContainerTokenIdentifier getContainerTokenIdentifier() {
|
||||
this.readLock.lock();
|
||||
try {
|
||||
return this.container;
|
||||
return this.containerTokenIdentifier;
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
|
@ -385,15 +400,15 @@ public class ContainerImpl implements Container {
|
|||
|
||||
@SuppressWarnings({"fallthrough", "unchecked"})
|
||||
private void finished() {
|
||||
ContainerId containerID = this.container.getId();
|
||||
ApplicationId applicationId =
|
||||
containerId.getApplicationAttemptId().getApplicationId();
|
||||
switch (getContainerState()) {
|
||||
case EXITED_WITH_SUCCESS:
|
||||
metrics.endRunningContainer();
|
||||
metrics.completedContainer();
|
||||
NMAuditLogger.logSuccess(user,
|
||||
AuditConstants.FINISH_SUCCESS_CONTAINER, "ContainerImpl",
|
||||
containerID.getApplicationAttemptId()
|
||||
.getApplicationId(), containerID);
|
||||
applicationId, containerId);
|
||||
break;
|
||||
case EXITED_WITH_FAILURE:
|
||||
metrics.endRunningContainer();
|
||||
|
@ -403,8 +418,7 @@ public class ContainerImpl implements Container {
|
|||
NMAuditLogger.logFailure(user,
|
||||
AuditConstants.FINISH_FAILED_CONTAINER, "ContainerImpl",
|
||||
"Container failed with state: " + getContainerState(),
|
||||
containerID.getApplicationAttemptId()
|
||||
.getApplicationId(), containerID);
|
||||
applicationId, containerId);
|
||||
break;
|
||||
case CONTAINER_CLEANEDUP_AFTER_KILL:
|
||||
metrics.endRunningContainer();
|
||||
|
@ -413,21 +427,21 @@ public class ContainerImpl implements Container {
|
|||
metrics.killedContainer();
|
||||
NMAuditLogger.logSuccess(user,
|
||||
AuditConstants.FINISH_KILLED_CONTAINER, "ContainerImpl",
|
||||
containerID.getApplicationAttemptId().getApplicationId(),
|
||||
containerID);
|
||||
applicationId,
|
||||
containerId);
|
||||
}
|
||||
|
||||
metrics.releaseContainer(this.container.getResource());
|
||||
metrics.releaseContainer(this.resource);
|
||||
|
||||
// Inform the application
|
||||
@SuppressWarnings("rawtypes")
|
||||
EventHandler eventHandler = dispatcher.getEventHandler();
|
||||
eventHandler.handle(new ApplicationContainerFinishedEvent(containerID));
|
||||
eventHandler.handle(new ApplicationContainerFinishedEvent(containerId));
|
||||
// Remove the container from the resource-monitor
|
||||
eventHandler.handle(new ContainerStopMonitoringEvent(containerID));
|
||||
eventHandler.handle(new ContainerStopMonitoringEvent(containerId));
|
||||
// Tell the logService too
|
||||
eventHandler.handle(new LogHandlerContainerFinishedEvent(
|
||||
containerID, exitCode));
|
||||
containerId, exitCode));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked") // dispatcher not typed
|
||||
|
@ -489,7 +503,7 @@ public class ContainerImpl implements Container {
|
|||
for (Map.Entry<String,ByteBuffer> service : csd.entrySet()) {
|
||||
container.dispatcher.getEventHandler().handle(
|
||||
new AuxServicesEvent(AuxServicesEventType.APPLICATION_INIT,
|
||||
container.user, container.container.getId()
|
||||
container.user, container.containerId
|
||||
.getApplicationAttemptId().getApplicationId(),
|
||||
service.getKey().toString(), service.getValue()));
|
||||
}
|
||||
|
@ -574,7 +588,7 @@ public class ContainerImpl implements Container {
|
|||
container.pendingResources.remove(rsrcEvent.getResource());
|
||||
if (null == syms) {
|
||||
LOG.warn("Localized unknown resource " + rsrcEvent.getResource() +
|
||||
" for container " + container.container.getId());
|
||||
" for container " + container.containerId);
|
||||
assert false;
|
||||
// fail container?
|
||||
return ContainerState.LOCALIZING;
|
||||
|
@ -602,14 +616,14 @@ public class ContainerImpl implements Container {
|
|||
// Inform the ContainersMonitor to start monitoring the container's
|
||||
// resource usage.
|
||||
long pmemBytes =
|
||||
container.container.getResource().getMemory() * 1024 * 1024L;
|
||||
container.getResource().getMemory() * 1024 * 1024L;
|
||||
float pmemRatio = container.daemonConf.getFloat(
|
||||
YarnConfiguration.NM_VMEM_PMEM_RATIO,
|
||||
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
|
||||
long vmemBytes = (long) (pmemRatio * pmemBytes);
|
||||
|
||||
container.dispatcher.getEventHandler().handle(
|
||||
new ContainerStartMonitoringEvent(container.container.getId(),
|
||||
new ContainerStartMonitoringEvent(container.containerId,
|
||||
vmemBytes, pmemBytes));
|
||||
container.metrics.runningContainer();
|
||||
}
|
||||
|
@ -743,7 +757,7 @@ public class ContainerImpl implements Container {
|
|||
container.pendingResources.remove(rsrcEvent.getResource());
|
||||
if (null == syms) {
|
||||
LOG.warn("Localized unknown resource " + rsrcEvent.getResource() +
|
||||
" for container " + container.container.getId());
|
||||
" for container " + container.containerId);
|
||||
assert false;
|
||||
// fail container?
|
||||
return;
|
||||
|
@ -848,7 +862,7 @@ public class ContainerImpl implements Container {
|
|||
public String toString() {
|
||||
this.readLock.lock();
|
||||
try {
|
||||
return ConverterUtils.toString(container.getId());
|
||||
return ConverterUtils.toString(this.containerId);
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
|
|
|
@ -22,8 +22,8 @@ import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
|||
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
|
||||
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.util.ArrayList;
|
||||
|
@ -53,12 +53,12 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.DelayedProcessKiller;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
|
||||
|
@ -68,7 +68,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.Cont
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader;
|
||||
import org.apache.hadoop.yarn.util.Apps;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
|
||||
public class ContainerLaunch implements Callable<Integer> {
|
||||
|
@ -86,6 +85,7 @@ public class ContainerLaunch implements Callable<Integer> {
|
|||
private final Application app;
|
||||
private final Container container;
|
||||
private final Configuration conf;
|
||||
private final Context context;
|
||||
|
||||
private volatile AtomicBoolean shouldLaunchContainer = new AtomicBoolean(false);
|
||||
private volatile AtomicBoolean completed = new AtomicBoolean(false);
|
||||
|
@ -97,9 +97,10 @@ public class ContainerLaunch implements Callable<Integer> {
|
|||
|
||||
private final LocalDirsHandlerService dirsHandler;
|
||||
|
||||
public ContainerLaunch(Configuration configuration, Dispatcher dispatcher,
|
||||
ContainerExecutor exec, Application app, Container container,
|
||||
LocalDirsHandlerService dirsHandler) {
|
||||
public ContainerLaunch(Context context, Configuration configuration,
|
||||
Dispatcher dispatcher, ContainerExecutor exec, Application app,
|
||||
Container container, LocalDirsHandlerService dirsHandler) {
|
||||
this.context = context;
|
||||
this.conf = configuration;
|
||||
this.app = app;
|
||||
this.exec = exec;
|
||||
|
@ -120,7 +121,7 @@ public class ContainerLaunch implements Callable<Integer> {
|
|||
final ContainerLaunchContext launchContext = container.getLaunchContext();
|
||||
final Map<Path,List<String>> localResources =
|
||||
container.getLocalizedResources();
|
||||
ContainerId containerID = container.getContainer().getId();
|
||||
ContainerId containerID = container.getContainerId();
|
||||
String containerIdStr = ConverterUtils.toString(containerID);
|
||||
final List<String> command = launchContext.getCommands();
|
||||
int ret = -1;
|
||||
|
@ -301,7 +302,7 @@ public class ContainerLaunch implements Callable<Integer> {
|
|||
* @throws IOException
|
||||
*/
|
||||
public void cleanupContainer() throws IOException {
|
||||
ContainerId containerId = container.getContainer().getId();
|
||||
ContainerId containerId = container.getContainerId();
|
||||
String containerIdStr = ConverterUtils.toString(containerId);
|
||||
LOG.info("Cleaning up container " + containerIdStr);
|
||||
|
||||
|
@ -372,7 +373,7 @@ public class ContainerLaunch implements Callable<Integer> {
|
|||
*/
|
||||
private String getContainerPid(Path pidFilePath) throws Exception {
|
||||
String containerIdStr =
|
||||
ConverterUtils.toString(container.getContainer().getId());
|
||||
ConverterUtils.toString(container.getContainerId());
|
||||
String processId = null;
|
||||
LOG.debug("Accessing pid for container " + containerIdStr
|
||||
+ " from pid file " + pidFilePath);
|
||||
|
@ -550,16 +551,16 @@ public class ContainerLaunch implements Callable<Integer> {
|
|||
*/
|
||||
|
||||
environment.put(Environment.CONTAINER_ID.name(), container
|
||||
.getContainer().getId().toString());
|
||||
.getContainerId().toString());
|
||||
|
||||
environment.put(Environment.NM_PORT.name(),
|
||||
String.valueOf(container.getContainer().getNodeId().getPort()));
|
||||
String.valueOf(this.context.getNodeId().getPort()));
|
||||
|
||||
environment.put(Environment.NM_HOST.name(), container.getContainer()
|
||||
.getNodeId().getHost());
|
||||
environment.put(Environment.NM_HOST.name(), this.context.getNodeId()
|
||||
.getHost());
|
||||
|
||||
environment.put(Environment.NM_HTTP_PORT.name(), container.getContainer()
|
||||
.getNodeHttpAddress().split(":")[1]);
|
||||
environment.put(Environment.NM_HTTP_PORT.name(),
|
||||
String.valueOf(this.context.getHttpPort()));
|
||||
|
||||
environment.put(Environment.LOCAL_DIRS.name(),
|
||||
StringUtils.join(",", appDirs));
|
||||
|
|
|
@ -36,8 +36,8 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
||||
|
@ -111,15 +111,16 @@ public class ContainersLauncher extends AbstractService
|
|||
public void handle(ContainersLauncherEvent event) {
|
||||
// TODO: ContainersLauncher launches containers one by one!!
|
||||
Container container = event.getContainer();
|
||||
ContainerId containerId = container.getContainer().getId();
|
||||
ContainerId containerId = container.getContainerId();
|
||||
switch (event.getType()) {
|
||||
case LAUNCH_CONTAINER:
|
||||
Application app =
|
||||
context.getApplications().get(
|
||||
containerId.getApplicationAttemptId().getApplicationId());
|
||||
|
||||
ContainerLaunch launch = new ContainerLaunch(getConfig(), dispatcher,
|
||||
exec, app, event.getContainer(), dirsHandler);
|
||||
ContainerLaunch launch =
|
||||
new ContainerLaunch(context, getConfig(), dispatcher, exec, app,
|
||||
event.getContainer(), dirsHandler);
|
||||
running.put(containerId,
|
||||
new RunningContainer(containerLauncher.submit(launch),
|
||||
launch));
|
||||
|
|
|
@ -359,14 +359,14 @@ public class ResourceLocalizationService extends CompositeService
|
|||
ContainerLocalizationRequestEvent rsrcReqs) {
|
||||
Container c = rsrcReqs.getContainer();
|
||||
LocalizerContext ctxt = new LocalizerContext(
|
||||
c.getUser(), c.getContainer().getId(), c.getCredentials());
|
||||
c.getUser(), c.getContainerId(), c.getCredentials());
|
||||
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
|
||||
rsrcReqs.getRequestedResources();
|
||||
for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
|
||||
rsrcs.entrySet()) {
|
||||
LocalResourcesTracker tracker =
|
||||
getLocalResourcesTracker(e.getKey(), c.getUser(),
|
||||
c.getContainer().getId().getApplicationAttemptId()
|
||||
c.getContainerId().getApplicationAttemptId()
|
||||
.getApplicationId());
|
||||
for (LocalResourceRequest req : e.getValue()) {
|
||||
tracker.handle(new ResourceRequestEvent(req, e.getKey(), ctxt));
|
||||
|
@ -396,21 +396,21 @@ public class ResourceLocalizationService extends CompositeService
|
|||
for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
|
||||
rsrcs.entrySet()) {
|
||||
LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(),
|
||||
c.getContainer().getId().getApplicationAttemptId()
|
||||
c.getContainerId().getApplicationAttemptId()
|
||||
.getApplicationId());
|
||||
for (LocalResourceRequest req : e.getValue()) {
|
||||
tracker.handle(new ResourceReleaseEvent(req,
|
||||
c.getContainer().getId()));
|
||||
c.getContainerId()));
|
||||
}
|
||||
}
|
||||
String locId = ConverterUtils.toString(c.getContainer().getId());
|
||||
String locId = ConverterUtils.toString(c.getContainerId());
|
||||
localizerTracker.cleanupPrivLocalizers(locId);
|
||||
|
||||
// Delete the container directories
|
||||
String userName = c.getUser();
|
||||
String containerIDStr = c.toString();
|
||||
String appIDStr = ConverterUtils.toString(
|
||||
c.getContainer().getId().getApplicationAttemptId().getApplicationId());
|
||||
c.getContainerId().getApplicationAttemptId().getApplicationId());
|
||||
for (String localDir : dirsHandler.getLocalDirs()) {
|
||||
|
||||
// Delete the user-owned container-dir
|
||||
|
@ -430,7 +430,7 @@ public class ResourceLocalizationService extends CompositeService
|
|||
}
|
||||
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ContainerEvent(c.getContainer().getId(),
|
||||
new ContainerEvent(c.getContainerId(),
|
||||
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
|
||||
}
|
||||
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.nodemanager.webapp;
|
|||
|
||||
import static org.apache.hadoop.yarn.util.StringHelper.pajoin;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -43,6 +42,7 @@ public class WebServer extends AbstractService {
|
|||
private final Context nmContext;
|
||||
private final NMWebApp nmWebApp;
|
||||
private WebApp webApp;
|
||||
private int port;
|
||||
|
||||
public WebServer(Context nmContext, ResourceView resourceView,
|
||||
ApplicationACLsManager aclsManager,
|
||||
|
@ -66,9 +66,7 @@ public class WebServer extends AbstractService {
|
|||
this.webApp =
|
||||
WebApps.$for("node", Context.class, this.nmContext, "ws")
|
||||
.at(bindAddress).with(getConfig()).start(this.nmWebApp);
|
||||
int port = this.webApp.httpServer().getPort();
|
||||
String webAddress = StringUtils.split(bindAddress, ':')[0] + ":" + port;
|
||||
getConfig().set(YarnConfiguration.NM_WEBAPP_ADDRESS, webAddress);
|
||||
this.port = this.webApp.httpServer().getPort();
|
||||
} catch (Exception e) {
|
||||
String msg = "NMWebapps failed to start.";
|
||||
LOG.error(msg, e);
|
||||
|
@ -77,6 +75,10 @@ public class WebServer extends AbstractService {
|
|||
super.start();
|
||||
}
|
||||
|
||||
public int getPort() {
|
||||
return this.port;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void stop() {
|
||||
if (this.webApp != null) {
|
||||
|
|
|
@ -29,7 +29,6 @@ import javax.xml.bind.annotation.XmlTransient;
|
|||
import org.apache.hadoop.yarn.api.ContainerExitStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
|
||||
|
@ -60,7 +59,7 @@ public class ContainerInfo {
|
|||
public ContainerInfo(final Context nmContext, final Container container,
|
||||
String requestUri, String pathPrefix) {
|
||||
|
||||
this.id = container.getContainer().getId().toString();
|
||||
this.id = container.getContainerId().toString();
|
||||
this.nodeId = nmContext.getNodeId().toString();
|
||||
ContainerStatus containerData = container.cloneAndGetContainerStatus();
|
||||
this.exitCode = containerData.getExitStatus();
|
||||
|
@ -74,7 +73,7 @@ public class ContainerInfo {
|
|||
}
|
||||
|
||||
this.user = container.getUser();
|
||||
Resource res = container.getContainer().getResource();
|
||||
Resource res = container.getResource();
|
||||
if (res != null) {
|
||||
this.totalMemoryNeededMB = res.getMemory();
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.nodemanager;
|
|||
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -54,7 +53,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.Log
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
|
||||
public class DummyContainerManager extends ContainerManagerImpl {
|
||||
|
||||
|
@ -94,10 +92,10 @@ public class DummyContainerManager extends ContainerManagerImpl {
|
|||
.getRequestedResources().values()) {
|
||||
for (LocalResourceRequest req : rc) {
|
||||
LOG.info("DEBUG: " + req + ":"
|
||||
+ rsrcReqs.getContainer().getContainer().getId());
|
||||
+ rsrcReqs.getContainer().getContainerId());
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ContainerResourceLocalizedEvent(rsrcReqs.getContainer()
|
||||
.getContainer().getId(), req, new Path("file:///local"
|
||||
.getContainerId(), req, new Path("file:///local"
|
||||
+ req.getPath().toUri().getPath())));
|
||||
}
|
||||
}
|
||||
|
@ -107,7 +105,7 @@ public class DummyContainerManager extends ContainerManagerImpl {
|
|||
((ContainerLocalizationEvent) event).getContainer();
|
||||
// TODO: delete the container dir
|
||||
this.dispatcher.getEventHandler().handle(
|
||||
new ContainerEvent(container.getContainer().getId(),
|
||||
new ContainerEvent(container.getContainerId(),
|
||||
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
|
||||
break;
|
||||
case DESTROY_APPLICATION_RESOURCES:
|
||||
|
@ -136,7 +134,7 @@ public class DummyContainerManager extends ContainerManagerImpl {
|
|||
@Override
|
||||
public void handle(ContainersLauncherEvent event) {
|
||||
Container container = event.getContainer();
|
||||
ContainerId containerId = container.getContainer().getId();
|
||||
ContainerId containerId = container.getContainerId();
|
||||
switch (event.getType()) {
|
||||
case LAUNCH_CONTAINER:
|
||||
dispatcher.getEventHandler().handle(
|
||||
|
@ -183,25 +181,15 @@ public class DummyContainerManager extends ContainerManagerImpl {
|
|||
@Override
|
||||
protected void authorizeRequest(String containerIDStr,
|
||||
ContainerLaunchContext launchContext,
|
||||
org.apache.hadoop.yarn.api.records.Container container,
|
||||
UserGroupInformation remoteUgi, ContainerTokenIdentifier tokenId)
|
||||
throws YarnRemoteException {
|
||||
// do Nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ContainerTokenIdentifier getContainerTokenIdentifier(
|
||||
UserGroupInformation remoteUgi,
|
||||
org.apache.hadoop.yarn.api.records.Container container)
|
||||
throws YarnRemoteException {
|
||||
try {
|
||||
return new ContainerTokenIdentifier(container.getId(),
|
||||
container.getNodeHttpAddress(), remoteUgi.getUserName(),
|
||||
container.getResource(), System.currentTimeMillis() + 100000l, 123,
|
||||
BuilderUtils.newContainerTokenIdentifier(
|
||||
container.getContainerToken()).getRMIdentifer());
|
||||
} catch (IOException e) {
|
||||
throw new YarnRemoteException(e);
|
||||
}
|
||||
protected ContainerTokenIdentifier
|
||||
getContainerTokenIdentifier(UserGroupInformation remoteUgi,
|
||||
ContainerTokenIdentifier containerTokenId) throws YarnRemoteException {
|
||||
return containerTokenId;
|
||||
}
|
||||
}
|
|
@ -18,9 +18,6 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.nodemanager;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
|
@ -30,7 +27,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
|
@ -80,7 +76,12 @@ public class TestEventFlow {
|
|||
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
|
||||
Context context = new NMContext(new NMContainerTokenSecretManager(conf));
|
||||
Context context = new NMContext(new NMContainerTokenSecretManager(conf)) {
|
||||
@Override
|
||||
public int getHttpPort() {
|
||||
return 1234;
|
||||
}
|
||||
};
|
||||
|
||||
conf.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath());
|
||||
conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
|
||||
|
@ -132,10 +133,7 @@ public class TestEventFlow {
|
|||
applicationAttemptId.setApplicationId(applicationId);
|
||||
applicationAttemptId.setAttemptId(0);
|
||||
cID.setApplicationAttemptId(applicationAttemptId);
|
||||
Container mockContainer = mock(Container.class);
|
||||
when(mockContainer.getId()).thenReturn(cID);
|
||||
Resource r = BuilderUtils.newResource(1024, 1);
|
||||
when(mockContainer.getResource()).thenReturn(r);
|
||||
String user = "testing";
|
||||
String host = "127.0.0.1";
|
||||
int port = 1234;
|
||||
|
@ -143,11 +141,10 @@ public class TestEventFlow {
|
|||
BuilderUtils.newContainerToken(cID, host, port, user, r,
|
||||
System.currentTimeMillis() + 10000L, 123, "password".getBytes(),
|
||||
SIMULATED_RM_IDENTIFIER);
|
||||
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
||||
StartContainerRequest request =
|
||||
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
request.setContainerLaunchContext(launchContext);
|
||||
request.setContainer(mockContainer);
|
||||
request.setContainerToken(containerToken);
|
||||
containerManager.startContainer(request);
|
||||
|
||||
BaseContainerManagerTest.waitForContainerState(containerManager, cID,
|
||||
|
|
|
@ -185,10 +185,7 @@ public class TestLinuxContainerExecutor {
|
|||
ContainerLaunchContext context = mock(ContainerLaunchContext.class);
|
||||
HashMap<String, String> env = new HashMap<String,String>();
|
||||
|
||||
org.apache.hadoop.yarn.api.records.Container containerAPI =
|
||||
mock(org.apache.hadoop.yarn.api.records.Container.class);
|
||||
when(container.getContainer()).thenReturn(containerAPI);
|
||||
when(container.getContainer().getId()).thenReturn(cId);
|
||||
when(container.getContainerId()).thenReturn(cId);
|
||||
when(container.getLaunchContext()).thenReturn(context);
|
||||
|
||||
when(context.getEnvironment()).thenReturn(env);
|
||||
|
|
|
@ -34,6 +34,7 @@ import java.util.LinkedList;
|
|||
import java.util.List;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -108,10 +109,7 @@ public class TestLinuxContainerExecutorWithMocks {
|
|||
ContainerLaunchContext context = mock(ContainerLaunchContext.class);
|
||||
HashMap<String, String> env = new HashMap<String,String>();
|
||||
|
||||
org.apache.hadoop.yarn.api.records.Container containerAPI =
|
||||
mock(org.apache.hadoop.yarn.api.records.Container.class);
|
||||
when(container.getContainer()).thenReturn(containerAPI);
|
||||
when(container.getContainer().getId()).thenReturn(cId);
|
||||
when(container.getContainerId()).thenReturn(cId);
|
||||
when(container.getLaunchContext()).thenReturn(context);
|
||||
|
||||
when(cId.toString()).thenReturn(containerId);
|
||||
|
@ -229,10 +227,7 @@ public class TestLinuxContainerExecutorWithMocks {
|
|||
ContainerLaunchContext context = mock(ContainerLaunchContext.class);
|
||||
HashMap<String, String> env = new HashMap<String, String>();
|
||||
|
||||
org.apache.hadoop.yarn.api.records.Container containerAPI =
|
||||
mock(org.apache.hadoop.yarn.api.records.Container.class);
|
||||
when(container.getContainer()).thenReturn(containerAPI);
|
||||
when(container.getContainer().getId()).thenReturn(cId);
|
||||
when(container.getContainerId()).thenReturn(cId);
|
||||
when(container.getLaunchContext()).thenReturn(context);
|
||||
|
||||
when(cId.toString()).thenReturn(containerId);
|
||||
|
|
|
@ -112,9 +112,6 @@ public class TestNodeManagerReboot {
|
|||
Records.newRecord(ContainerLaunchContext.class);
|
||||
// Construct the Container-id
|
||||
ContainerId cId = createContainerId();
|
||||
org.apache.hadoop.yarn.api.records.Container mockContainer =
|
||||
Records.newRecord(org.apache.hadoop.yarn.api.records.Container.class);
|
||||
mockContainer.setId(cId);
|
||||
|
||||
URL localResourceUri =
|
||||
ConverterUtils.getYarnUrlFromPath(localFS
|
||||
|
@ -136,20 +133,16 @@ public class TestNodeManagerReboot {
|
|||
containerLaunchContext.setCommands(commands);
|
||||
Resource resource = Records.newRecord(Resource.class);
|
||||
resource.setMemory(1024);
|
||||
mockContainer.setResource(resource);
|
||||
NodeId nodeId = BuilderUtils.newNodeId("127.0.0.1", 12345);
|
||||
ContainerToken containerToken =
|
||||
BuilderUtils.newContainerToken(cId, nodeId.getHost(), nodeId.getPort(),
|
||||
user, resource, System.currentTimeMillis() + 10000L, 123,
|
||||
"password".getBytes(), 0);
|
||||
mockContainer.setContainerToken(containerToken);
|
||||
mockContainer.setNodeHttpAddress("127.0.0.1");
|
||||
mockContainer.setNodeId(nodeId);
|
||||
|
||||
final StartContainerRequest startRequest =
|
||||
Records.newRecord(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
startRequest.setContainer(mockContainer);
|
||||
startRequest.setContainerToken(containerToken);
|
||||
final UserGroupInformation currentUser = UserGroupInformation
|
||||
.createRemoteUser(cId.toString());
|
||||
currentUser.doAs(new PrivilegedExceptionAction<Void>() {
|
||||
|
|
|
@ -31,7 +31,6 @@ import org.apache.hadoop.fs.FileContext;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
@ -43,7 +42,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManag
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.NMNotYetReadyException;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -281,12 +279,10 @@ public class TestNodeManagerResync {
|
|||
try {
|
||||
while (!isStopped && numContainers < 10) {
|
||||
ContainerId cId = TestNodeManagerShutdown.createContainerId();
|
||||
Container container =
|
||||
BuilderUtils.newContainer(cId, null, null, null, null, null);
|
||||
StartContainerRequest startRequest =
|
||||
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
startRequest.setContainer(container);
|
||||
startRequest.setContainerToken(null);
|
||||
System.out.println("no. of containers to be launched: "
|
||||
+ numContainers);
|
||||
numContainers++;
|
||||
|
|
|
@ -44,7 +44,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
|
@ -56,7 +55,6 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
|||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.URL;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
|
@ -158,13 +156,8 @@ public class TestNodeManagerShutdown {
|
|||
|
||||
ContainerLaunchContext containerLaunchContext =
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
Container mockContainer = new ContainerPBImpl();
|
||||
|
||||
mockContainer.setId(cId);
|
||||
|
||||
NodeId nodeId = BuilderUtils.newNodeId("localhost", 1234);
|
||||
mockContainer.setNodeId(nodeId);
|
||||
mockContainer.setNodeHttpAddress("localhost:12345");
|
||||
|
||||
URL localResourceUri =
|
||||
ConverterUtils.getYarnUrlFromPath(localFS
|
||||
|
@ -184,16 +177,14 @@ public class TestNodeManagerShutdown {
|
|||
List<String> commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
|
||||
containerLaunchContext.setCommands(commands);
|
||||
Resource resource = BuilderUtils.newResource(1024, 1);
|
||||
mockContainer.setResource(resource);
|
||||
ContainerToken containerToken =
|
||||
BuilderUtils.newContainerToken(cId, nodeId.getHost(), nodeId.getPort(),
|
||||
user, resource, System.currentTimeMillis() + 10000L, 123,
|
||||
"password".getBytes(), 0);
|
||||
mockContainer.setContainerToken(containerToken);
|
||||
StartContainerRequest startRequest =
|
||||
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
startRequest.setContainer(mockContainer);
|
||||
startRequest.setContainerToken(containerToken);
|
||||
UserGroupInformation currentUser = UserGroupInformation
|
||||
.createRemoteUser(cId.toString());
|
||||
|
||||
|
|
|
@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
|||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceTracker;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
|
@ -187,8 +188,6 @@ public class TestNodeStatusUpdater {
|
|||
nodeStatus.setResponseId(heartBeatID++);
|
||||
Map<ApplicationId, List<ContainerStatus>> appToContainers =
|
||||
getAppToContainerStatusMap(nodeStatus.getContainersStatuses());
|
||||
org.apache.hadoop.yarn.api.records.Container mockContainer =
|
||||
mock(org.apache.hadoop.yarn.api.records.Container.class);
|
||||
|
||||
ApplicationId appId1 = ApplicationId.newInstance(0, 1);
|
||||
ApplicationId appId2 = ApplicationId.newInstance(0, 2);
|
||||
|
@ -202,12 +201,17 @@ public class TestNodeStatusUpdater {
|
|||
firstContainerID.setId(heartBeatID);
|
||||
ContainerLaunchContext launchContext = recordFactory
|
||||
.newRecordInstance(ContainerLaunchContext.class);
|
||||
when(mockContainer.getId()).thenReturn(firstContainerID);
|
||||
Resource resource = BuilderUtils.newResource(2, 1);
|
||||
when(mockContainer.getResource()).thenReturn(resource);
|
||||
long currentTime = System.currentTimeMillis();
|
||||
String user = "testUser";
|
||||
ContainerTokenIdentifier containerToken =
|
||||
BuilderUtils.newContainerTokenIdentifier(BuilderUtils
|
||||
.newContainerToken(firstContainerID, "127.0.0.1", 1234, user,
|
||||
resource, currentTime + 10000, 123, "password".getBytes(),
|
||||
currentTime));
|
||||
Container container =
|
||||
new ContainerImpl(conf, mockDispatcher, launchContext,
|
||||
mockContainer, null, mockMetrics, null);
|
||||
new ContainerImpl(conf, mockDispatcher, launchContext, null,
|
||||
mockMetrics, containerToken);
|
||||
this.context.getContainers().put(firstContainerID, container);
|
||||
} else if (heartBeatID == 2) {
|
||||
// Checks on the RM end
|
||||
|
@ -227,12 +231,17 @@ public class TestNodeStatusUpdater {
|
|||
secondContainerID.setId(heartBeatID);
|
||||
ContainerLaunchContext launchContext = recordFactory
|
||||
.newRecordInstance(ContainerLaunchContext.class);
|
||||
when(mockContainer.getId()).thenReturn(secondContainerID);
|
||||
long currentTime = System.currentTimeMillis();
|
||||
String user = "testUser";
|
||||
Resource resource = BuilderUtils.newResource(3, 1);
|
||||
when(mockContainer.getResource()).thenReturn(resource);
|
||||
ContainerTokenIdentifier containerToken =
|
||||
BuilderUtils.newContainerTokenIdentifier(BuilderUtils
|
||||
.newContainerToken(secondContainerID, "127.0.0.1", 1234, user,
|
||||
resource, currentTime + 10000, 123,
|
||||
"password".getBytes(), currentTime));
|
||||
Container container =
|
||||
new ContainerImpl(conf, mockDispatcher, launchContext,
|
||||
mockContainer, null, mockMetrics, null);
|
||||
new ContainerImpl(conf, mockDispatcher, launchContext, null,
|
||||
mockMetrics, containerToken);
|
||||
this.context.getContainers().put(secondContainerID, container);
|
||||
} else if (heartBeatID == 3) {
|
||||
// Checks on the RM end
|
||||
|
|
|
@ -33,7 +33,6 @@ import org.apache.hadoop.security.UserGroupInformation;
|
|||
import org.apache.hadoop.yarn.api.ContainerManager;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
|
@ -94,9 +93,14 @@ public abstract class BaseContainerManagerTest {
|
|||
protected static Log LOG = LogFactory
|
||||
.getLog(BaseContainerManagerTest.class);
|
||||
|
||||
protected static final int HTTP_PORT = 5412;
|
||||
protected Configuration conf = new YarnConfiguration();
|
||||
protected Context context = new NMContext(new NMContainerTokenSecretManager(
|
||||
conf));
|
||||
conf)) {
|
||||
public int getHttpPort() {
|
||||
return HTTP_PORT;
|
||||
};
|
||||
};
|
||||
protected ContainerExecutor exec;
|
||||
protected DeletionService delSrvc;
|
||||
protected String user = "nobody";
|
||||
|
@ -177,9 +181,8 @@ public abstract class BaseContainerManagerTest {
|
|||
|
||||
@Override
|
||||
protected void authorizeRequest(String containerIDStr,
|
||||
ContainerLaunchContext launchContext, Container container,
|
||||
UserGroupInformation remoteUgi, ContainerTokenIdentifier tokenId)
|
||||
throws YarnRemoteException {
|
||||
ContainerLaunchContext launchContext, UserGroupInformation remoteUgi,
|
||||
ContainerTokenIdentifier tokenId) throws YarnRemoteException {
|
||||
// do nothing
|
||||
}
|
||||
};
|
||||
|
|
|
@ -18,9 +18,6 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.FileReader;
|
||||
|
@ -43,7 +40,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
|
@ -142,24 +138,16 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|||
new HashMap<String, LocalResource>();
|
||||
localResources.put(destinationFile, rsrc_alpha);
|
||||
containerLaunchContext.setLocalResources(localResources);
|
||||
Container mockContainer = mock(Container.class);
|
||||
when(mockContainer.getId()).thenReturn(cId);
|
||||
Resource r = BuilderUtils.newResource(512, 1);
|
||||
when(mockContainer.getResource()).thenReturn(r);
|
||||
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
|
||||
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
|
||||
int port = 12345;
|
||||
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
||||
context.getNodeId().getHost() + ":" + port);
|
||||
ContainerToken containerToken =
|
||||
BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
|
||||
port, user, r, System.currentTimeMillis() + 10000L, 123,
|
||||
"password".getBytes(), super.DUMMY_RM_IDENTIFIER);
|
||||
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
||||
StartContainerRequest startRequest =
|
||||
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
startRequest.setContainer(mockContainer);
|
||||
startRequest.setContainerToken(containerToken);
|
||||
|
||||
containerManager.startContainer(startRequest);
|
||||
|
||||
|
@ -243,23 +231,16 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|||
containerLaunchContext.setLocalResources(localResources);
|
||||
List<String> commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
|
||||
containerLaunchContext.setCommands(commands);
|
||||
Container mockContainer = mock(Container.class);
|
||||
when(mockContainer.getId()).thenReturn(cId);
|
||||
Resource r = BuilderUtils.newResource(100, 1);
|
||||
when(mockContainer.getResource()).thenReturn(r); // MB
|
||||
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
|
||||
int port = 12345;
|
||||
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
||||
context.getNodeId().getHost() + ":" + port);
|
||||
ContainerToken containerToken =
|
||||
BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
|
||||
port, user, r, System.currentTimeMillis() + 10000L, 123,
|
||||
"password".getBytes(), super.DUMMY_RM_IDENTIFIER);
|
||||
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
||||
|
||||
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
startRequest.setContainer(mockContainer);
|
||||
startRequest.setContainerToken(containerToken);
|
||||
containerManager.startContainer(startRequest);
|
||||
|
||||
int timeoutSecs = 0;
|
||||
|
@ -358,22 +339,15 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|||
containerLaunchContext.setLocalResources(localResources);
|
||||
List<String> commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
|
||||
containerLaunchContext.setCommands(commands);
|
||||
Container mockContainer = mock(Container.class);
|
||||
when(mockContainer.getId()).thenReturn(cId);
|
||||
Resource r = BuilderUtils.newResource(100, 1);
|
||||
when(mockContainer.getResource()).thenReturn(r); // MB
|
||||
int port = 12345;
|
||||
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
|
||||
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
||||
context.getNodeId().getHost() + ":" + port);
|
||||
ContainerToken containerToken =
|
||||
BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
|
||||
port, user, r, System.currentTimeMillis() + 10000L, 123,
|
||||
"password".getBytes(), super.DUMMY_RM_IDENTIFIER);
|
||||
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
||||
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
startRequest.setContainer(mockContainer);
|
||||
startRequest.setContainerToken(containerToken);
|
||||
containerManager.startContainer(startRequest);
|
||||
|
||||
BaseContainerManagerTest.waitForContainerState(containerManager, cId,
|
||||
|
@ -453,24 +427,16 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|||
new HashMap<String, LocalResource>();
|
||||
localResources.put(destinationFile, rsrc_alpha);
|
||||
containerLaunchContext.setLocalResources(localResources);
|
||||
Container mockContainer = mock(Container.class);
|
||||
when(mockContainer.getId()).thenReturn(cId);
|
||||
Resource r = BuilderUtils.newResource(100, 1);
|
||||
when(mockContainer.getResource()).thenReturn(r);
|
||||
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
|
||||
int port = 12345;
|
||||
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
||||
context.getNodeId().getHost() + ":" + port);
|
||||
|
||||
// containerLaunchContext.command = new ArrayList<CharSequence>();
|
||||
ContainerToken containerToken =
|
||||
BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
|
||||
port, user, r, System.currentTimeMillis() + 10000L, 123,
|
||||
"password".getBytes(), super.DUMMY_RM_IDENTIFIER);
|
||||
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
||||
StartContainerRequest request = recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
request.setContainerLaunchContext(containerLaunchContext);
|
||||
request.setContainer(mockContainer);
|
||||
request.setContainerToken(containerToken);
|
||||
containerManager.startContainer(request);
|
||||
|
||||
BaseContainerManagerTest.waitForContainerState(containerManager, cId,
|
||||
|
@ -549,8 +515,6 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|||
.setLocalResources(new HashMap<String, LocalResource>());
|
||||
Resource mockResource = BuilderUtils.newResource(1024, 1);
|
||||
|
||||
Container mockContainer1 = mock(Container.class);
|
||||
when(mockContainer1.getId()).thenReturn(cId1);
|
||||
// Construct the Container with Invalid RMIdentifier
|
||||
StartContainerRequest startRequest1 =
|
||||
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
|
@ -560,8 +524,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|||
BuilderUtils.newContainerToken(cId1, host, port, user, mockResource,
|
||||
System.currentTimeMillis() + 10000, 123, "password".getBytes(),
|
||||
(long) ResourceManagerConstants.RM_INVALID_IDENTIFIER);
|
||||
when(mockContainer1.getContainerToken()).thenReturn(containerToken1);
|
||||
startRequest1.setContainer(mockContainer1);
|
||||
startRequest1.setContainerToken(containerToken1);
|
||||
boolean catchException = false;
|
||||
try {
|
||||
containerManager.startContainer(startRequest1);
|
||||
|
@ -579,10 +542,6 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|||
Assert.assertTrue(catchException);
|
||||
|
||||
// Construct the Container with a RMIdentifier within current RM
|
||||
Container mockContainer2 = mock(Container.class);
|
||||
when(mockContainer2.getId()).thenReturn(cId2);
|
||||
|
||||
when(mockContainer2.getResource()).thenReturn(mockResource);
|
||||
StartContainerRequest startRequest2 =
|
||||
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
startRequest2.setContainerLaunchContext(containerLaunchContext);
|
||||
|
@ -590,9 +549,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|||
BuilderUtils.newContainerToken(cId1, host, port, user, mockResource,
|
||||
System.currentTimeMillis() + 10000, 123, "password".getBytes(),
|
||||
super.DUMMY_RM_IDENTIFIER);
|
||||
when(mockContainer2.getContainerToken()).thenReturn(containerToken2);
|
||||
|
||||
startRequest2.setContainer(mockContainer2);
|
||||
startRequest2.setContainerToken(containerToken2);
|
||||
boolean noException = true;
|
||||
try {
|
||||
containerManager.startContainer(startRequest2);
|
||||
|
|
|
@ -91,7 +91,7 @@ public class TestApplication {
|
|||
for (int i = 0; i < wa.containers.size(); i++) {
|
||||
verify(wa.containerBus).handle(
|
||||
argThat(new ContainerInitMatcher(wa.containers.get(i)
|
||||
.getContainer().getId())));
|
||||
.getContainerId())));
|
||||
}
|
||||
} finally {
|
||||
if (wa != null)
|
||||
|
@ -116,7 +116,7 @@ public class TestApplication {
|
|||
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
|
||||
verify(wa.containerBus).handle(
|
||||
argThat(new ContainerInitMatcher(wa.containers.get(0)
|
||||
.getContainer().getId())));
|
||||
.getContainerId())));
|
||||
|
||||
wa.initContainer(1);
|
||||
wa.initContainer(2);
|
||||
|
@ -126,7 +126,7 @@ public class TestApplication {
|
|||
for (int i = 1; i < wa.containers.size(); i++) {
|
||||
verify(wa.containerBus).handle(
|
||||
argThat(new ContainerInitMatcher(wa.containers.get(i)
|
||||
.getContainer().getId())));
|
||||
.getContainerId())));
|
||||
}
|
||||
} finally {
|
||||
if (wa != null)
|
||||
|
@ -241,7 +241,7 @@ public class TestApplication {
|
|||
for (int i = 1; i < wa.containers.size(); i++) {
|
||||
verify(wa.containerBus).handle(
|
||||
argThat(new ContainerKillMatcher(wa.containers.get(i)
|
||||
.getContainer().getId())));
|
||||
.getContainerId())));
|
||||
}
|
||||
|
||||
wa.containerFinished(1);
|
||||
|
@ -267,7 +267,7 @@ public class TestApplication {
|
|||
wa.appResourcesCleanedup();
|
||||
for ( Container container : wa.containers) {
|
||||
Assert.assertTrue(wa.context.getContainerTokenSecretManager()
|
||||
.isValidStartContainerRequest(container.getContainer().getId()));
|
||||
.isValidStartContainerRequest(container.getContainerId()));
|
||||
}
|
||||
assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
|
||||
|
||||
|
@ -307,7 +307,7 @@ public class TestApplication {
|
|||
wa.appResourcesCleanedup();
|
||||
for ( Container container : wa.containers) {
|
||||
Assert.assertTrue(wa.context.getContainerTokenSecretManager()
|
||||
.isValidStartContainerRequest(container.getContainer().getId()));
|
||||
.isValidStartContainerRequest(container.getContainerId()));
|
||||
}
|
||||
assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
|
||||
} finally {
|
||||
|
@ -370,7 +370,7 @@ public class TestApplication {
|
|||
|
||||
verify(wa.containerBus).handle(
|
||||
argThat(new ContainerKillMatcher(wa.containers.get(0)
|
||||
.getContainer().getId())));
|
||||
.getContainerId())));
|
||||
assertEquals(ApplicationState.FINISHING_CONTAINERS_WAIT,
|
||||
wa.app.getApplicationState());
|
||||
|
||||
|
@ -487,10 +487,10 @@ public class TestApplication {
|
|||
containers.add(container);
|
||||
long currentTime = System.currentTimeMillis();
|
||||
context.getContainerTokenSecretManager().startContainerSuccessful(
|
||||
new ContainerTokenIdentifier(container.getContainer().getId(), "",
|
||||
new ContainerTokenIdentifier(container.getContainerId(), "",
|
||||
"", null, currentTime + 1000, masterKey.getKeyId(), currentTime));
|
||||
Assert.assertFalse(context.getContainerTokenSecretManager()
|
||||
.isValidStartContainerRequest(container.getContainer().getId()));
|
||||
.isValidStartContainerRequest(container.getContainerId()));
|
||||
}
|
||||
|
||||
dispatcher.start();
|
||||
|
@ -522,7 +522,7 @@ public class TestApplication {
|
|||
|
||||
public void containerFinished(int containerNum) {
|
||||
app.handle(new ApplicationContainerFinishedEvent(containers.get(
|
||||
containerNum).getContainer().getId()));
|
||||
containerNum).getContainerId()));
|
||||
drainDispatcherEvents();
|
||||
}
|
||||
|
||||
|
@ -549,10 +549,7 @@ public class TestApplication {
|
|||
BuilderUtils.newApplicationAttemptId(appId, 1);
|
||||
ContainerId cId = BuilderUtils.newContainerId(appAttemptId, containerId);
|
||||
Container c = mock(Container.class);
|
||||
org.apache.hadoop.yarn.api.records.Container containerAPI =
|
||||
mock(org.apache.hadoop.yarn.api.records.Container.class);
|
||||
when(c.getContainer()).thenReturn(containerAPI);
|
||||
when(c.getContainer().getId()).thenReturn(cId);
|
||||
when(c.getContainerId()).thenReturn(cId);
|
||||
ContainerLaunchContext launchContext = mock(ContainerLaunchContext.class);
|
||||
when(c.getLaunchContext()).thenReturn(launchContext);
|
||||
when(launchContext.getApplicationACLs()).thenReturn(
|
||||
|
|
|
@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
|||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.URL;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
|
@ -377,7 +376,7 @@ public class TestContainer {
|
|||
public boolean matches(Object o) {
|
||||
ContainersLauncherEvent evt = (ContainersLauncherEvent) o;
|
||||
return evt.getType() == ContainersLauncherEventType.LAUNCH_CONTAINER
|
||||
&& wcf.cId == evt.getContainer().getContainer().getId();
|
||||
&& wcf.cId == evt.getContainer().getContainerId();
|
||||
}
|
||||
};
|
||||
verify(wc.launcherBus).handle(argThat(matchesLaunchReq));
|
||||
|
@ -526,13 +525,6 @@ public class TestContainer {
|
|||
return serviceData;
|
||||
}
|
||||
|
||||
private Container newContainer(Dispatcher disp, ContainerLaunchContext ctx,
|
||||
org.apache.hadoop.yarn.api.records.Container container,
|
||||
ContainerTokenIdentifier identifier) throws IOException {
|
||||
return new ContainerImpl(conf, disp, ctx, container, null, metrics,
|
||||
identifier);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private class WrappedContainer {
|
||||
final DrainDispatcher dispatcher;
|
||||
|
@ -612,7 +604,7 @@ public class TestContainer {
|
|||
}
|
||||
when(ctxt.getServiceData()).thenReturn(serviceData);
|
||||
|
||||
c = newContainer(dispatcher, ctxt, mockContainer, identifier);
|
||||
c = new ContainerImpl(conf, dispatcher, ctxt, null, metrics, identifier);
|
||||
dispatcher.start();
|
||||
}
|
||||
|
||||
|
@ -649,7 +641,7 @@ public class TestContainer {
|
|||
Path p = new Path(cache, rsrc.getKey());
|
||||
localPaths.put(p, Arrays.asList(rsrc.getKey()));
|
||||
// rsrc copied to p
|
||||
c.handle(new ContainerResourceLocalizedEvent(c.getContainer().getId(),
|
||||
c.handle(new ContainerResourceLocalizedEvent(c.getContainerId(),
|
||||
req, p));
|
||||
}
|
||||
drainDispatcherEvents();
|
||||
|
@ -672,8 +664,8 @@ public class TestContainer {
|
|||
LocalResource rsrc = localResources.get(rsrcKey);
|
||||
LocalResourceRequest req = new LocalResourceRequest(rsrc);
|
||||
Exception e = new Exception("Fake localization error");
|
||||
c.handle(new ContainerResourceFailedEvent(c.getContainer()
|
||||
.getId(), req, e.getMessage()));
|
||||
c.handle(new ContainerResourceFailedEvent(c.getContainerId(), req, e
|
||||
.getMessage()));
|
||||
drainDispatcherEvents();
|
||||
}
|
||||
|
||||
|
@ -688,7 +680,7 @@ public class TestContainer {
|
|||
++counter;
|
||||
LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue());
|
||||
Exception e = new Exception("Fake localization error");
|
||||
c.handle(new ContainerResourceFailedEvent(c.getContainer().getId(),
|
||||
c.handle(new ContainerResourceFailedEvent(c.getContainerId(),
|
||||
req, e.getMessage()));
|
||||
}
|
||||
drainDispatcherEvents();
|
||||
|
|
|
@ -19,8 +19,6 @@
|
|||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
|
@ -47,7 +45,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
|
@ -163,7 +160,6 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
|
|||
ContainerLaunchContext containerLaunchContext =
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
|
||||
Container mockContainer = mock(Container.class);
|
||||
// ////// Construct the Container-id
|
||||
ApplicationId appId = ApplicationId.newInstance(0, 0);
|
||||
ApplicationAttemptId appAttemptId =
|
||||
|
@ -174,11 +170,6 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
|
|||
recordFactory.newRecordInstance(ContainerId.class);
|
||||
int port = 12345;
|
||||
cId.setApplicationAttemptId(appAttemptId);
|
||||
when(mockContainer.getId()).thenReturn(cId);
|
||||
|
||||
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
|
||||
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
||||
context.getNodeId().getHost() + ":" + port);
|
||||
|
||||
Map<String, String> userSetEnv = new HashMap<String, String>();
|
||||
userSetEnv.put(Environment.CONTAINER_ID.name(), "user_set_container_id");
|
||||
|
@ -242,15 +233,13 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
|
|||
List<String> commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
|
||||
containerLaunchContext.setCommands(commands);
|
||||
Resource r = BuilderUtils.newResource(1024, 1);
|
||||
when(mockContainer.getResource()).thenReturn(r);
|
||||
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
ContainerToken containerToken =
|
||||
BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
|
||||
port, user, r, System.currentTimeMillis() + 10000L, 1234,
|
||||
"password".getBytes(), super.DUMMY_RM_IDENTIFIER);
|
||||
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
||||
startRequest.setContainer(mockContainer);
|
||||
startRequest.setContainerToken(containerToken);
|
||||
containerManager.startContainer(startRequest);
|
||||
|
||||
int timeoutSecs = 0;
|
||||
|
@ -273,27 +262,20 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
|
|||
BufferedReader reader =
|
||||
new BufferedReader(new FileReader(processStartFile));
|
||||
Assert.assertEquals(cId.toString(), reader.readLine());
|
||||
Assert.assertEquals(mockContainer.getNodeId().getHost(),
|
||||
reader.readLine());
|
||||
Assert.assertEquals(String.valueOf(mockContainer.getNodeId().getPort()),
|
||||
reader.readLine());
|
||||
Assert.assertEquals(
|
||||
String.valueOf(mockContainer.getNodeHttpAddress().split(":")[1]),
|
||||
reader.readLine());
|
||||
Assert.assertEquals(context.getNodeId().getHost(), reader.readLine());
|
||||
Assert.assertEquals(String.valueOf(context.getNodeId().getPort()),
|
||||
reader.readLine());
|
||||
Assert.assertEquals(String.valueOf(HTTP_PORT), reader.readLine());
|
||||
Assert.assertEquals(StringUtils.join(",", appDirs), reader.readLine());
|
||||
|
||||
Assert.assertEquals(cId.toString(), containerLaunchContext
|
||||
.getEnvironment().get(Environment.CONTAINER_ID.name()));
|
||||
Assert.assertEquals(mockContainer.getNodeId().getHost(),
|
||||
containerLaunchContext.getEnvironment()
|
||||
.get(Environment.NM_HOST.name()));
|
||||
Assert.assertEquals(String.valueOf(mockContainer.getNodeId().getPort()),
|
||||
containerLaunchContext.getEnvironment().get(
|
||||
Environment.NM_PORT.name()));
|
||||
Assert.assertEquals(
|
||||
mockContainer.getNodeHttpAddress().split(":")[1],
|
||||
containerLaunchContext.getEnvironment().get(
|
||||
Environment.NM_HTTP_PORT.name()));
|
||||
Assert.assertEquals(context.getNodeId().getHost(), containerLaunchContext
|
||||
.getEnvironment().get(Environment.NM_HOST.name()));
|
||||
Assert.assertEquals(String.valueOf(context.getNodeId().getPort()),
|
||||
containerLaunchContext.getEnvironment().get(Environment.NM_PORT.name()));
|
||||
Assert.assertEquals(String.valueOf(HTTP_PORT), containerLaunchContext
|
||||
.getEnvironment().get(Environment.NM_HTTP_PORT.name()));
|
||||
Assert.assertEquals(StringUtils.join(",", appDirs), containerLaunchContext
|
||||
.getEnvironment().get(Environment.LOCAL_DIRS.name()));
|
||||
// Get the pid of the process
|
||||
|
@ -335,7 +317,6 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
|
|||
public void testDelayedKill() throws Exception {
|
||||
containerManager.start();
|
||||
|
||||
Container mockContainer = mock(Container.class);
|
||||
// ////// Construct the Container-id
|
||||
ApplicationId appId = ApplicationId.newInstance(1, 1);
|
||||
ApplicationAttemptId appAttemptId =
|
||||
|
@ -372,11 +353,7 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
|
|||
|
||||
ContainerLaunchContext containerLaunchContext =
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
when(mockContainer.getId()).thenReturn(cId);
|
||||
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
|
||||
int port = 12345;
|
||||
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
||||
context.getNodeId().getHost() + ":" + port);
|
||||
|
||||
// upload the script file so that the container can run it
|
||||
URL resource_alpha =
|
||||
|
@ -399,15 +376,13 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
|
|||
List<String> commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
|
||||
containerLaunchContext.setCommands(commands);
|
||||
Resource r = BuilderUtils.newResource(1024, 1);
|
||||
when(mockContainer.getResource()).thenReturn(r);
|
||||
ContainerToken containerToken =
|
||||
BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
|
||||
port, user, r, System.currentTimeMillis() + 10000L, 123,
|
||||
"password".getBytes(), super.DUMMY_RM_IDENTIFIER);
|
||||
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
||||
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
startRequest.setContainer(mockContainer);
|
||||
startRequest.setContainerToken(containerToken);
|
||||
containerManager.startContainer(startRequest);
|
||||
|
||||
int timeoutSecs = 0;
|
||||
|
|
|
@ -495,7 +495,7 @@ public class TestResourceLocalizationService {
|
|||
Thread.sleep(1000);
|
||||
dispatcher.await();
|
||||
String appStr = ConverterUtils.toString(appId);
|
||||
String ctnrStr = c.getContainer().getId().toString();
|
||||
String ctnrStr = c.getContainerId().toString();
|
||||
ArgumentCaptor<Path> tokenPathCaptor = ArgumentCaptor.forClass(Path.class);
|
||||
verify(exec).startLocalizer(tokenPathCaptor.capture(),
|
||||
isA(InetSocketAddress.class), eq("user0"), eq(appStr), eq(ctnrStr),
|
||||
|
@ -571,7 +571,7 @@ public class TestResourceLocalizationService {
|
|||
public boolean matches(Object o) {
|
||||
ContainerEvent evt = (ContainerEvent) o;
|
||||
return evt.getType() == ContainerEventType.RESOURCE_LOCALIZED
|
||||
&& c.getContainer().getId() == evt.getContainerID();
|
||||
&& c.getContainerId() == evt.getContainerID();
|
||||
}
|
||||
};
|
||||
// total 2 resource localzation calls. one for each resource.
|
||||
|
@ -760,11 +760,11 @@ public class TestResourceLocalizationService {
|
|||
|
||||
// Container - 1
|
||||
ContainerImpl container1 = createMockContainer(user, 1);
|
||||
String localizerId1 = container1.getContainer().getId().toString();
|
||||
String localizerId1 = container1.getContainerId().toString();
|
||||
rls.getPrivateLocalizers().put(
|
||||
localizerId1,
|
||||
rls.new LocalizerRunner(new LocalizerContext(user, container1
|
||||
.getContainer().getId(), null), localizerId1));
|
||||
.getContainerId(), null), localizerId1));
|
||||
LocalizerRunner localizerRunner1 = rls.getLocalizerRunner(localizerId1);
|
||||
|
||||
dispatcher1.getEventHandler().handle(
|
||||
|
@ -775,11 +775,11 @@ public class TestResourceLocalizationService {
|
|||
|
||||
// Container - 2 now makes the request.
|
||||
ContainerImpl container2 = createMockContainer(user, 2);
|
||||
String localizerId2 = container2.getContainer().getId().toString();
|
||||
String localizerId2 = container2.getContainerId().toString();
|
||||
rls.getPrivateLocalizers().put(
|
||||
localizerId2,
|
||||
rls.new LocalizerRunner(new LocalizerContext(user, container2
|
||||
.getContainer().getId(), null), localizerId2));
|
||||
.getContainerId(), null), localizerId2));
|
||||
LocalizerRunner localizerRunner2 = rls.getLocalizerRunner(localizerId2);
|
||||
dispatcher1.getEventHandler().handle(
|
||||
createContainerLocalizationEvent(container2,
|
||||
|
@ -920,11 +920,11 @@ public class TestResourceLocalizationService {
|
|||
|
||||
// Container - 1
|
||||
Container container1 = createMockContainer(user, 1);
|
||||
String localizerId1 = container1.getContainer().getId().toString();
|
||||
String localizerId1 = container1.getContainerId().toString();
|
||||
rls.getPrivateLocalizers().put(
|
||||
localizerId1,
|
||||
rls.new LocalizerRunner(new LocalizerContext(user, container1
|
||||
.getContainer().getId(), null), localizerId1));
|
||||
.getContainerId(), null), localizerId1));
|
||||
|
||||
// Creating two requests for container
|
||||
// 1) Private resource
|
||||
|
@ -1317,10 +1317,7 @@ public class TestResourceLocalizationService {
|
|||
|
||||
private ContainerImpl createMockContainer(String user, int containerId) {
|
||||
ContainerImpl container = mock(ContainerImpl.class);
|
||||
org.apache.hadoop.yarn.api.records.Container c =
|
||||
mock(org.apache.hadoop.yarn.api.records.Container.class);
|
||||
when(container.getContainer()).thenReturn(c);
|
||||
when(container.getContainer().getId()).thenReturn(
|
||||
when(container.getContainerId()).thenReturn(
|
||||
BuilderUtils.newContainerId(1, 1, 1, containerId));
|
||||
when(container.getUser()).thenReturn(user);
|
||||
Credentials mockCredentials = mock(Credentials.class);
|
||||
|
@ -1360,11 +1357,8 @@ public class TestResourceLocalizationService {
|
|||
ApplicationAttemptId appAttemptId =
|
||||
BuilderUtils.newApplicationAttemptId(appId, 1);
|
||||
ContainerId cId = BuilderUtils.newContainerId(appAttemptId, id);
|
||||
org.apache.hadoop.yarn.api.records.Container containerAPI =
|
||||
mock(org.apache.hadoop.yarn.api.records.Container.class);
|
||||
when(c.getContainer()).thenReturn(containerAPI);
|
||||
when(c.getUser()).thenReturn("user0");
|
||||
when(c.getContainer().getId()).thenReturn(cId);
|
||||
when(c.getContainerId()).thenReturn(cId);
|
||||
Credentials creds = new Credentials();
|
||||
creds.addToken(new Text("tok" + id), getToken(id));
|
||||
when(c.getCredentials()).thenReturn(creds);
|
||||
|
|
|
@ -55,10 +55,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
|
@ -707,15 +707,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
|||
|
||||
ContainerLaunchContext containerLaunchContext =
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
Container mockContainer = mock(Container.class);
|
||||
// ////// Construct the Container-id
|
||||
ApplicationId appId = ApplicationId.newInstance(0, 0);
|
||||
ApplicationAttemptId appAttemptId =
|
||||
BuilderUtils.newApplicationAttemptId(appId, 1);
|
||||
ContainerId cId = BuilderUtils.newContainerId(appAttemptId, 0);
|
||||
|
||||
when(mockContainer.getId()).thenReturn(cId);
|
||||
|
||||
URL resource_alpha =
|
||||
ConverterUtils.getYarnUrlFromPath(localFS
|
||||
.makeQualified(new Path(scriptFile.getAbsolutePath())));
|
||||
|
@ -736,15 +733,14 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
|||
commands.add(scriptFile.getAbsolutePath());
|
||||
containerLaunchContext.setCommands(commands);
|
||||
Resource r = BuilderUtils.newResource(100 * 1024 * 1024, 1);
|
||||
when(mockContainer.getResource()).thenReturn(r);
|
||||
when(mockContainer.getContainerToken()).thenReturn(
|
||||
BuilderUtils.newContainerToken(cId, "127.0.0.1", 1234, user, r,
|
||||
System.currentTimeMillis() + 10000L, 123, "password".getBytes(),
|
||||
super.DUMMY_RM_IDENTIFIER));
|
||||
ContainerToken containerToken =
|
||||
BuilderUtils.newContainerToken(cId, "127.0.0.1", 1234, user, r,
|
||||
System.currentTimeMillis() + 10000L, 123, "password".getBytes(),
|
||||
super.DUMMY_RM_IDENTIFIER);
|
||||
StartContainerRequest startRequest =
|
||||
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
startRequest.setContainer(mockContainer);
|
||||
startRequest.setContainerToken(containerToken);
|
||||
this.containerManager.startContainer(startRequest);
|
||||
|
||||
BaseContainerManagerTest.waitForContainerState(this.containerManager,
|
||||
|
|
|
@ -22,7 +22,6 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
|
@ -45,7 +44,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
|
@ -202,7 +200,6 @@ public class TestContainersMonitor extends BaseContainerManagerTest {
|
|||
|
||||
ContainerLaunchContext containerLaunchContext =
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
Container mockContainer = mock(Container.class);
|
||||
// ////// Construct the Container-id
|
||||
ApplicationId appId = ApplicationId.newInstance(0, 0);
|
||||
ApplicationAttemptId appAttemptId =
|
||||
|
@ -212,12 +209,7 @@ public class TestContainersMonitor extends BaseContainerManagerTest {
|
|||
ContainerId cId = recordFactory.newRecordInstance(ContainerId.class);
|
||||
cId.setId(0);
|
||||
cId.setApplicationAttemptId(appAttemptId);
|
||||
when(mockContainer.getId()).thenReturn(cId);
|
||||
|
||||
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
|
||||
int port = 12345;
|
||||
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
||||
context.getNodeId().getHost() + ":" + port);
|
||||
|
||||
URL resource_alpha =
|
||||
ConverterUtils.getYarnUrlFromPath(localFS
|
||||
|
@ -239,7 +231,6 @@ public class TestContainersMonitor extends BaseContainerManagerTest {
|
|||
commands.add(scriptFile.getAbsolutePath());
|
||||
containerLaunchContext.setCommands(commands);
|
||||
Resource r = BuilderUtils.newResource(8 * 1024 * 1024, 1);
|
||||
when(mockContainer.getResource()).thenReturn(r);
|
||||
StartContainerRequest startRequest =
|
||||
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
|
@ -247,8 +238,7 @@ public class TestContainersMonitor extends BaseContainerManagerTest {
|
|||
BuilderUtils.newContainerToken(cId, context.getNodeId().getHost(),
|
||||
port, user, r, System.currentTimeMillis() + 10000L, 123,
|
||||
"password".getBytes(), super.DUMMY_RM_IDENTIFIER);
|
||||
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
||||
startRequest.setContainer(mockContainer);
|
||||
startRequest.setContainerToken(containerToken);
|
||||
containerManager.startContainer(startRequest);
|
||||
|
||||
int timeoutSecs = 0;
|
||||
|
|
|
@ -18,9 +18,6 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.webapp;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
@ -34,10 +31,11 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||
|
@ -52,7 +50,7 @@ public class MockContainer implements Container {
|
|||
private final Map<Path, List<String>> resource =
|
||||
new HashMap<Path, List<String>>();
|
||||
private RecordFactory recordFactory;
|
||||
private org.apache.hadoop.yarn.api.records.Container mockContainer;
|
||||
private final ContainerTokenIdentifier containerTokenIdentifier;
|
||||
|
||||
public MockContainer(ApplicationAttemptId appAttemptId,
|
||||
Dispatcher dispatcher, Configuration conf, String user,
|
||||
|
@ -65,15 +63,12 @@ public class MockContainer implements Container {
|
|||
this.launchContext = recordFactory
|
||||
.newRecordInstance(ContainerLaunchContext.class);
|
||||
long currentTime = System.currentTimeMillis();
|
||||
ContainerToken containerToken =
|
||||
BuilderUtils.newContainerToken(id, "127.0.0.1", 1234, user,
|
||||
BuilderUtils.newResource(1024, 1), currentTime + 10000, 123,
|
||||
"password".getBytes(), currentTime);
|
||||
this.containerTokenIdentifier =
|
||||
BuilderUtils.newContainerTokenIdentifier(BuilderUtils
|
||||
.newContainerToken(id, "127.0.0.1", 1234, user,
|
||||
BuilderUtils.newResource(1024, 1), currentTime + 10000, 123,
|
||||
"password".getBytes(), currentTime));
|
||||
this.state = ContainerState.NEW;
|
||||
|
||||
mockContainer = mock(org.apache.hadoop.yarn.api.records.Container.class);
|
||||
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
||||
when(mockContainer.getId()).thenReturn(id);
|
||||
}
|
||||
|
||||
public void setState(ContainerState state) {
|
||||
|
@ -126,7 +121,17 @@ public class MockContainer implements Container {
|
|||
}
|
||||
|
||||
@Override
|
||||
public org.apache.hadoop.yarn.api.records.Container getContainer() {
|
||||
return this.mockContainer;
|
||||
public ContainerId getContainerId() {
|
||||
return this.id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource getResource() {
|
||||
return this.containerTokenIdentifier.getResource();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ContainerTokenIdentifier getContainerTokenIdentifier() {
|
||||
return this.containerTokenIdentifier;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,7 +26,6 @@ import java.io.FileWriter;
|
|||
import java.io.IOException;
|
||||
import java.io.Writer;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
|
@ -76,7 +75,7 @@ public class TestNMWebServer {
|
|||
FileUtil.fullyDelete(testLogDir);
|
||||
}
|
||||
|
||||
private String startNMWebAppServer(String webAddr) {
|
||||
private int startNMWebAppServer(String webAddr) {
|
||||
Context nmContext = new NodeManager.NMContext(null);
|
||||
ResourceView resourceView = new ResourceView() {
|
||||
@Override
|
||||
|
@ -107,20 +106,19 @@ public class TestNMWebServer {
|
|||
new ApplicationACLsManager(conf), dirsHandler);
|
||||
server.init(conf);
|
||||
server.start();
|
||||
String webAppAddr = conf.get(YarnConfiguration.NM_WEBAPP_ADDRESS);
|
||||
return StringUtils.split(webAppAddr, ':')[1];
|
||||
return server.getPort();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNMWebAppWithOutPort() throws IOException {
|
||||
String port = startNMWebAppServer("0.0.0.0");
|
||||
Assert.assertTrue("Port is not updated", Integer.parseInt(port) > 0);
|
||||
int port = startNMWebAppServer("0.0.0.0");
|
||||
Assert.assertTrue("Port is not updated", port > 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNMWebAppWithEphemeralPort() throws IOException {
|
||||
String port = startNMWebAppServer("0.0.0.0:0");
|
||||
Assert.assertTrue("Port is not updated", Integer.parseInt(port) > 0);
|
||||
int port = startNMWebAppServer("0.0.0.0:0");
|
||||
Assert.assertTrue("Port is not updated", port > 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -180,17 +178,13 @@ public class TestNMWebServer {
|
|||
// TODO: Use builder utils
|
||||
ContainerLaunchContext launchContext =
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
org.apache.hadoop.yarn.api.records.Container mockContainer =
|
||||
mock(org.apache.hadoop.yarn.api.records.Container.class);
|
||||
long currentTime = System.currentTimeMillis();
|
||||
ContainerToken containerToken =
|
||||
BuilderUtils.newContainerToken(containerId, "127.0.0.1", 1234, user,
|
||||
BuilderUtils.newResource(1024, 1), currentTime + 10000L, 123,
|
||||
"password".getBytes(), currentTime);
|
||||
when(mockContainer.getContainerToken()).thenReturn(containerToken);
|
||||
when(mockContainer.getId()).thenReturn(containerId);
|
||||
Container container =
|
||||
new ContainerImpl(conf, dispatcher, launchContext, mockContainer,
|
||||
new ContainerImpl(conf, dispatcher, launchContext,
|
||||
null, metrics,
|
||||
BuilderUtils.newContainerTokenIdentifier(containerToken)) {
|
||||
|
||||
|
|
|
@ -188,17 +188,17 @@ public class TestNMWebServicesApps extends JerseyTest {
|
|||
Container container2 = new MockContainer(appAttemptId, dispatcher, conf,
|
||||
app.getUser(), app.getAppId(), 2);
|
||||
nmContext.getContainers()
|
||||
.put(container1.getContainer().getId(), container1);
|
||||
.put(container1.getContainerId(), container1);
|
||||
nmContext.getContainers()
|
||||
.put(container2.getContainer().getId(), container2);
|
||||
.put(container2.getContainerId(), container2);
|
||||
|
||||
app.getContainers().put(container1.getContainer().getId(), container1);
|
||||
app.getContainers().put(container2.getContainer().getId(), container2);
|
||||
app.getContainers().put(container1.getContainerId(), container1);
|
||||
app.getContainers().put(container2.getContainerId(), container2);
|
||||
HashMap<String, String> hash = new HashMap<String, String>();
|
||||
hash.put(container1.getContainer().getId().toString(), container1
|
||||
.getContainer().getId().toString());
|
||||
hash.put(container2.getContainer().getId().toString(), container2
|
||||
.getContainer().getId().toString());
|
||||
hash.put(container1.getContainerId().toString(), container1
|
||||
.getContainerId().toString());
|
||||
hash.put(container2.getContainerId().toString(), container2
|
||||
.getContainerId().toString());
|
||||
return hash;
|
||||
}
|
||||
|
||||
|
|
|
@ -35,6 +35,7 @@ import javax.xml.parsers.DocumentBuilderFactory;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
|
@ -92,9 +93,15 @@ public class TestNMWebServicesContainers extends JerseyTest {
|
|||
private Injector injector = Guice.createInjector(new ServletModule() {
|
||||
@Override
|
||||
protected void configureServlets() {
|
||||
nmContext = new NodeManager.NMContext(null);
|
||||
nmContext.getNodeId().setHost("testhost.foo.com");
|
||||
nmContext.getNodeId().setPort(8042);
|
||||
nmContext = new NodeManager.NMContext(null) {
|
||||
public NodeId getNodeId() {
|
||||
return NodeId.newInstance("testhost.foo.com", 8042);
|
||||
};
|
||||
|
||||
public int getHttpPort() {
|
||||
return 1234;
|
||||
};
|
||||
};
|
||||
resourceView = new ResourceView() {
|
||||
@Override
|
||||
public long getVmemAllocatedForContainers() {
|
||||
|
@ -189,17 +196,17 @@ public class TestNMWebServicesContainers extends JerseyTest {
|
|||
Container container2 = new MockContainer(appAttemptId, dispatcher, conf,
|
||||
app.getUser(), app.getAppId(), 2);
|
||||
nmContext.getContainers()
|
||||
.put(container1.getContainer().getId(), container1);
|
||||
.put(container1.getContainerId(), container1);
|
||||
nmContext.getContainers()
|
||||
.put(container2.getContainer().getId(), container2);
|
||||
.put(container2.getContainerId(), container2);
|
||||
|
||||
app.getContainers().put(container1.getContainer().getId(), container1);
|
||||
app.getContainers().put(container2.getContainer().getId(), container2);
|
||||
app.getContainers().put(container1.getContainerId(), container1);
|
||||
app.getContainers().put(container2.getContainerId(), container2);
|
||||
HashMap<String, String> hash = new HashMap<String, String>();
|
||||
hash.put(container1.getContainer().getId().toString(), container1
|
||||
.getContainer().getId().toString());
|
||||
hash.put(container2.getContainer().getId().toString(), container2
|
||||
.getContainer().getId().toString());
|
||||
hash.put(container1.getContainerId().toString(), container1
|
||||
.getContainerId().toString());
|
||||
hash.put(container2.getContainerId().toString(), container2
|
||||
.getContainerId().toString());
|
||||
return hash;
|
||||
}
|
||||
|
||||
|
@ -472,7 +479,7 @@ public class TestNMWebServicesContainers extends JerseyTest {
|
|||
String state, String user, int exitCode, String diagnostics,
|
||||
String nodeId, int totalMemoryNeededMB, String logsLink)
|
||||
throws JSONException, Exception {
|
||||
WebServicesTestUtils.checkStringMatch("id", cont.getContainer().getId()
|
||||
WebServicesTestUtils.checkStringMatch("id", cont.getContainerId()
|
||||
.toString(), id);
|
||||
WebServicesTestUtils.checkStringMatch("state", cont.getContainerState()
|
||||
.toString(), state);
|
||||
|
@ -484,9 +491,11 @@ public class TestNMWebServicesContainers extends JerseyTest {
|
|||
|
||||
WebServicesTestUtils.checkStringMatch("nodeId", nmContext.getNodeId()
|
||||
.toString(), nodeId);
|
||||
assertEquals("totalMemoryNeededMB wrong", 0, totalMemoryNeededMB);
|
||||
assertEquals("totalMemoryNeededMB wrong",
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
||||
totalMemoryNeededMB);
|
||||
String shortLink =
|
||||
ujoin("containerlogs", cont.getContainer().getId().toString(),
|
||||
ujoin("containerlogs", cont.getContainerId().toString(),
|
||||
cont.getUser());
|
||||
assertTrue("containerLogsLink wrong", logsLink.contains(shortLink));
|
||||
}
|
||||
|
|
|
@ -109,7 +109,7 @@ public class AMLauncher implements Runnable {
|
|||
StartContainerRequest request =
|
||||
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
request.setContainerLaunchContext(launchContext);
|
||||
request.setContainer(masterContainer);
|
||||
request.setContainerToken(masterContainer.getContainerToken());
|
||||
containerMgrProxy.startContainer(request);
|
||||
LOG.info("Done launching container " + masterContainer
|
||||
+ " for AM " + application.getAppAttemptId());
|
||||
|
|
|
@ -342,7 +342,7 @@ public class Application {
|
|||
// Launch the container
|
||||
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(createCLC());
|
||||
startRequest.setContainer(container);
|
||||
startRequest.setContainerToken(container.getContainerToken());
|
||||
nodeManager.startContainer(startRequest);
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.Container;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
|
@ -49,6 +50,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
|||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||
|
@ -161,11 +163,19 @@ public class NodeManager implements ContainerManager {
|
|||
synchronized public StartContainerResponse startContainer(
|
||||
StartContainerRequest request)
|
||||
throws YarnRemoteException {
|
||||
Container requestContainer = request.getContainer();
|
||||
|
||||
ContainerToken containerToken = request.getContainerToken();
|
||||
ContainerTokenIdentifier tokenId = null;
|
||||
|
||||
try {
|
||||
tokenId = BuilderUtils.newContainerTokenIdentifier(containerToken);
|
||||
} catch (IOException e) {
|
||||
throw RPCUtil.getRemoteException(e);
|
||||
}
|
||||
|
||||
ContainerId containerID = tokenId.getContainerID();
|
||||
ApplicationId applicationId =
|
||||
requestContainer.getId().getApplicationAttemptId().
|
||||
getApplicationId();
|
||||
containerID.getApplicationAttemptId().getApplicationId();
|
||||
|
||||
List<Container> applicationContainers = containers.get(applicationId);
|
||||
if (applicationContainers == null) {
|
||||
|
@ -175,18 +185,18 @@ public class NodeManager implements ContainerManager {
|
|||
|
||||
// Sanity check
|
||||
for (Container container : applicationContainers) {
|
||||
if (container.getId().compareTo(requestContainer.getId())
|
||||
if (container.getId().compareTo(containerID)
|
||||
== 0) {
|
||||
throw new IllegalStateException(
|
||||
"Container " + requestContainer.getId() +
|
||||
"Container " + containerID +
|
||||
" already setup on node " + containerManagerAddress);
|
||||
}
|
||||
}
|
||||
|
||||
Container container =
|
||||
BuilderUtils.newContainer(requestContainer.getId(),
|
||||
BuilderUtils.newContainer(containerID,
|
||||
this.nodeId, nodeHttpAddress,
|
||||
requestContainer.getResource(),
|
||||
tokenId.getResource(),
|
||||
null, null // DKDC - Doesn't matter
|
||||
);
|
||||
|
||||
|
@ -195,8 +205,8 @@ public class NodeManager implements ContainerManager {
|
|||
"", -1000);
|
||||
applicationContainers.add(container);
|
||||
containerStatusMap.put(container, containerStatus);
|
||||
Resources.subtractFrom(available, requestContainer.getResource());
|
||||
Resources.addTo(used, requestContainer.getResource());
|
||||
Resources.subtractFrom(available, tokenId.getResource());
|
||||
Resources.addTo(used, tokenId.getResource());
|
||||
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("startContainer:" + " node=" + containerManagerAddress
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -33,11 +34,15 @@ import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
@ -57,8 +62,6 @@ public class TestApplicationMasterLauncher {
|
|||
String attemptIdAtContainerManager = null;
|
||||
String containerIdAtContainerManager = null;
|
||||
String nmHostAtContainerManager = null;
|
||||
int nmPortAtContainerManager;
|
||||
int nmHttpPortAtContainerManager;
|
||||
long submitTimeAtContainerManager;
|
||||
int maxAppAttempts;
|
||||
|
||||
|
@ -70,17 +73,21 @@ public class TestApplicationMasterLauncher {
|
|||
launched = true;
|
||||
Map<String, String> env =
|
||||
request.getContainerLaunchContext().getEnvironment();
|
||||
ContainerId containerId =
|
||||
request.getContainer().getId();
|
||||
|
||||
ContainerToken containerToken = request.getContainerToken();
|
||||
ContainerTokenIdentifier tokenId = null;
|
||||
|
||||
try {
|
||||
tokenId = BuilderUtils.newContainerTokenIdentifier(containerToken);
|
||||
} catch (IOException e) {
|
||||
throw RPCUtil.getRemoteException(e);
|
||||
}
|
||||
|
||||
ContainerId containerId = tokenId.getContainerID();
|
||||
containerIdAtContainerManager = containerId.toString();
|
||||
attemptIdAtContainerManager =
|
||||
containerId.getApplicationAttemptId().toString();
|
||||
nmHostAtContainerManager = request.getContainer().getNodeId().getHost();
|
||||
nmPortAtContainerManager =
|
||||
request.getContainer().getNodeId().getPort();
|
||||
nmHttpPortAtContainerManager =
|
||||
Integer.parseInt(request.getContainer().getNodeHttpAddress()
|
||||
.split(":")[1]);
|
||||
nmHostAtContainerManager = tokenId.getNmHostAddress();
|
||||
submitTimeAtContainerManager =
|
||||
Long.parseLong(env.get(ApplicationConstants.APP_SUBMIT_TIME_ENV));
|
||||
maxAppAttempts =
|
||||
|
@ -135,12 +142,8 @@ public class TestApplicationMasterLauncher {
|
|||
Assert.assertEquals(app.getRMAppAttempt(appAttemptId)
|
||||
.getMasterContainer().getId()
|
||||
.toString(), containerManager.containerIdAtContainerManager);
|
||||
Assert.assertEquals(nm1.getNodeId().getHost(),
|
||||
containerManager.nmHostAtContainerManager);
|
||||
Assert.assertEquals(nm1.getNodeId().getPort(),
|
||||
containerManager.nmPortAtContainerManager);
|
||||
Assert.assertEquals(nm1.getHttpPort(),
|
||||
containerManager.nmHttpPortAtContainerManager);
|
||||
Assert.assertEquals(nm1.getNodeId().toString(),
|
||||
containerManager.nmHostAtContainerManager);
|
||||
Assert.assertEquals(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS,
|
||||
containerManager.maxAppAttempts);
|
||||
|
||||
|
|
|
@ -38,9 +38,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||
import org.apache.hadoop.io.DataInputBuffer;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
@ -49,7 +47,6 @@ import org.apache.hadoop.util.Shell;
|
|||
import org.apache.hadoop.yarn.api.AMRMProtocol;
|
||||
import org.apache.hadoop.yarn.api.ContainerManager;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
|
@ -121,8 +118,8 @@ public class TestContainerManagerSecurity {
|
|||
// Testing for malicious user
|
||||
testMaliceUser();
|
||||
|
||||
// Testing for unauthorized user
|
||||
testUnauthorizedUser();
|
||||
// Testing for usage of expired tokens
|
||||
testExpiredTokens();
|
||||
|
||||
} finally {
|
||||
if (yarnCluster != null) {
|
||||
|
@ -184,6 +181,15 @@ public class TestContainerManagerSecurity {
|
|||
resourceManager.getClientRMService().forceKillApplication(request);
|
||||
}
|
||||
|
||||
/**
|
||||
* This tests a malice user getting a proper token but then messing with it by
|
||||
* tampering with containerID/Resource etc.. His/her containers should be
|
||||
* rejected.
|
||||
*
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
* @throws YarnRemoteException
|
||||
*/
|
||||
private void testMaliceUser() throws IOException, InterruptedException,
|
||||
YarnRemoteException {
|
||||
|
||||
|
@ -205,30 +211,60 @@ public class TestContainerManagerSecurity {
|
|||
appID);
|
||||
|
||||
// Now talk to the NM for launching the container with modified resource
|
||||
final ContainerId containerID = allocatedContainer.getId();
|
||||
UserGroupInformation maliceUser = UserGroupInformation
|
||||
.createRemoteUser(containerID.toString());
|
||||
|
||||
ContainerToken containerToken = allocatedContainer.getContainerToken();
|
||||
byte[] identifierBytes = containerToken.getIdentifier().array();
|
||||
|
||||
DataInputBuffer di = new DataInputBuffer();
|
||||
di.reset(identifierBytes, identifierBytes.length);
|
||||
|
||||
ContainerTokenIdentifier dummyIdentifier = new ContainerTokenIdentifier();
|
||||
dummyIdentifier.readFields(di);
|
||||
ContainerTokenIdentifier originalContainerTokenId =
|
||||
BuilderUtils.newContainerTokenIdentifier(containerToken);
|
||||
|
||||
// Malice user modifies the resource amount
|
||||
Resource modifiedResource = BuilderUtils.newResource(2048, 1);
|
||||
ContainerTokenIdentifier modifiedIdentifier =
|
||||
new ContainerTokenIdentifier(dummyIdentifier.getContainerID(),
|
||||
dummyIdentifier.getNmHostAddress(), "testUser", modifiedResource,
|
||||
Long.MAX_VALUE, dummyIdentifier.getMasterKeyId(),
|
||||
new ContainerTokenIdentifier(originalContainerTokenId.getContainerID(),
|
||||
originalContainerTokenId.getNmHostAddress(), "testUser",
|
||||
modifiedResource, Long.MAX_VALUE,
|
||||
originalContainerTokenId.getMasterKeyId(),
|
||||
ResourceManager.clusterTimeStamp);
|
||||
Token<ContainerTokenIdentifier> modifiedToken = new Token<ContainerTokenIdentifier>(
|
||||
modifiedIdentifier.getBytes(), containerToken.getPassword().array(),
|
||||
new Text(containerToken.getKind()), new Text(containerToken
|
||||
.getService()));
|
||||
Token<ContainerTokenIdentifier> modifiedToken =
|
||||
new Token<ContainerTokenIdentifier>(modifiedIdentifier.getBytes(),
|
||||
containerToken.getPassword().array(), new Text(
|
||||
containerToken.getKind()), new Text(containerToken.getService()));
|
||||
makeTamperedStartContainerCall(yarnRPC, allocatedContainer,
|
||||
modifiedIdentifier, modifiedToken);
|
||||
|
||||
// Malice user modifies the container-Id
|
||||
ContainerId newContainerId =
|
||||
BuilderUtils.newContainerId(
|
||||
BuilderUtils.newApplicationAttemptId(originalContainerTokenId
|
||||
.getContainerID().getApplicationAttemptId().getApplicationId(), 1),
|
||||
originalContainerTokenId.getContainerID().getId() + 42);
|
||||
modifiedIdentifier =
|
||||
new ContainerTokenIdentifier(newContainerId,
|
||||
originalContainerTokenId.getNmHostAddress(), "testUser",
|
||||
originalContainerTokenId.getResource(), Long.MAX_VALUE,
|
||||
originalContainerTokenId.getMasterKeyId(),
|
||||
ResourceManager.clusterTimeStamp);
|
||||
modifiedToken =
|
||||
new Token<ContainerTokenIdentifier>(modifiedIdentifier.getBytes(),
|
||||
containerToken.getPassword().array(), new Text(
|
||||
containerToken.getKind()), new Text(containerToken.getService()));
|
||||
makeTamperedStartContainerCall(yarnRPC, allocatedContainer,
|
||||
modifiedIdentifier, modifiedToken);
|
||||
|
||||
// Similarly messing with anything else will fail.
|
||||
|
||||
KillApplicationRequest request = Records
|
||||
.newRecord(KillApplicationRequest.class);
|
||||
request.setApplicationId(appID);
|
||||
resourceManager.getClientRMService().forceKillApplication(request);
|
||||
}
|
||||
|
||||
private void makeTamperedStartContainerCall(final YarnRPC yarnRPC,
|
||||
final Container allocatedContainer,
|
||||
final ContainerTokenIdentifier modifiedIdentifier,
|
||||
Token<ContainerTokenIdentifier> modifiedToken) {
|
||||
final ContainerId containerID = allocatedContainer.getId();
|
||||
UserGroupInformation maliceUser = UserGroupInformation
|
||||
.createRemoteUser(containerID.toString());
|
||||
maliceUser.addToken(modifiedToken);
|
||||
maliceUser.doAs(new PrivilegedAction<Void>() {
|
||||
@Override
|
||||
|
@ -239,11 +275,14 @@ public class TestContainerManagerSecurity {
|
|||
conf);
|
||||
|
||||
LOG.info("Going to contact NM: ilLegal request");
|
||||
GetContainerStatusRequest request = recordFactory
|
||||
.newRecordInstance(GetContainerStatusRequest.class);
|
||||
request.setContainerId(containerID);
|
||||
StartContainerRequest request =
|
||||
Records.newRecord(StartContainerRequest.class);
|
||||
try {
|
||||
client.getContainerStatus(request);
|
||||
request.setContainerToken(allocatedContainer.getContainerToken());
|
||||
ContainerLaunchContext context =
|
||||
createContainerLaunchContextForTest(modifiedIdentifier);
|
||||
request.setContainerLaunchContext(context);
|
||||
client.startContainer(request);
|
||||
fail("Connection initiation with illegally modified "
|
||||
+ "tokens is expected to fail.");
|
||||
} catch (YarnRemoteException e) {
|
||||
|
@ -263,14 +302,9 @@ public class TestContainerManagerSecurity {
|
|||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
KillApplicationRequest request = Records
|
||||
.newRecord(KillApplicationRequest.class);
|
||||
request.setApplicationId(appID);
|
||||
resourceManager.getClientRMService().forceKillApplication(request);
|
||||
}
|
||||
|
||||
private void testUnauthorizedUser() throws IOException, InterruptedException,
|
||||
private void testExpiredTokens() throws IOException, InterruptedException,
|
||||
YarnRemoteException {
|
||||
|
||||
LOG.info("\n\nRunning test for malice user");
|
||||
|
@ -293,48 +327,12 @@ public class TestContainerManagerSecurity {
|
|||
// Now talk to the NM for launching the container with modified containerID
|
||||
final ContainerId containerID = allocatedContainer.getId();
|
||||
|
||||
/////////// Test calls with illegal containerIDs and illegal Resources
|
||||
UserGroupInformation unauthorizedUser = UserGroupInformation
|
||||
.createRemoteUser(containerID.toString());
|
||||
ContainerToken containerToken = allocatedContainer.getContainerToken();
|
||||
|
||||
byte[] identifierBytes = containerToken.getIdentifier().array();
|
||||
DataInputBuffer di = new DataInputBuffer();
|
||||
di.reset(identifierBytes, identifierBytes.length);
|
||||
final ContainerTokenIdentifier tokenId = new ContainerTokenIdentifier();
|
||||
tokenId.readFields(di);
|
||||
|
||||
Token<ContainerTokenIdentifier> token = new Token<ContainerTokenIdentifier>(
|
||||
identifierBytes, containerToken.getPassword().array(), new Text(
|
||||
containerToken.getKind()), new Text(containerToken.getService()));
|
||||
|
||||
unauthorizedUser.addToken(token);
|
||||
ContainerManager client =
|
||||
unauthorizedUser.doAs(new PrivilegedAction<ContainerManager>() {
|
||||
@Override
|
||||
public ContainerManager run() {
|
||||
ContainerManager client = (ContainerManager) yarnRPC.getProxy(
|
||||
ContainerManager.class, NetUtils
|
||||
.createSocketAddr(allocatedContainer.getNodeId().toString()),
|
||||
conf);
|
||||
|
||||
LOG.info("Going to contact NM: unauthorized request");
|
||||
|
||||
callWithIllegalContainerID(client, tokenId, allocatedContainer);
|
||||
callWithIllegalResource(client, tokenId, allocatedContainer);
|
||||
// UserName is no longer sent using containerLaunchContext.
|
||||
// callWithIllegalUserName(client, tokenId, allocatedContainer);
|
||||
|
||||
return client;
|
||||
}
|
||||
});
|
||||
|
||||
// ///////// End of testing for illegal containerIDs, illegal Resources and
|
||||
// illegal users
|
||||
final ContainerTokenIdentifier tokenId =
|
||||
BuilderUtils.newContainerTokenIdentifier(containerToken);
|
||||
|
||||
/////////// Test calls with expired tokens
|
||||
RPC.stopProxy(client);
|
||||
unauthorizedUser = UserGroupInformation
|
||||
UserGroupInformation unauthorizedUser = UserGroupInformation
|
||||
.createRemoteUser(containerID.toString());
|
||||
|
||||
RMContainerTokenSecretManager containerTokenSecreteManager =
|
||||
|
@ -349,9 +347,10 @@ public class TestContainerManagerSecurity {
|
|||
containerTokenSecreteManager.createPassword(
|
||||
newTokenId);
|
||||
// Create a valid token by using the key from the RM.
|
||||
token = new Token<ContainerTokenIdentifier>(
|
||||
newTokenId.getBytes(), passowrd, new Text(
|
||||
containerToken.getKind()), new Text(containerToken.getService()));
|
||||
Token<ContainerTokenIdentifier> token =
|
||||
new Token<ContainerTokenIdentifier>(newTokenId.getBytes(), passowrd,
|
||||
new Text(containerToken.getKind()), new Text(
|
||||
containerToken.getService()));
|
||||
|
||||
unauthorizedUser.addToken(token);
|
||||
unauthorizedUser.doAs(new PrivilegedAction<Void>() {
|
||||
|
@ -369,7 +368,7 @@ public class TestContainerManagerSecurity {
|
|||
request.setContainerLaunchContext(context);
|
||||
allocatedContainer.setContainerToken(BuilderUtils.newContainerToken(
|
||||
allocatedContainer.getNodeId(), passowrd, newTokenId));
|
||||
request.setContainer(allocatedContainer);
|
||||
request.setContainerToken(allocatedContainer.getContainerToken());
|
||||
|
||||
//Calling startContainer with an expired token.
|
||||
try {
|
||||
|
@ -524,93 +523,6 @@ public class TestContainerManagerSecurity {
|
|||
return allocatedContainers.get(0);
|
||||
}
|
||||
|
||||
void callWithIllegalContainerID(ContainerManager client,
|
||||
ContainerTokenIdentifier tokenId, Container container) {
|
||||
StartContainerRequest request = recordFactory
|
||||
.newRecordInstance(StartContainerRequest.class);
|
||||
ContainerLaunchContext context =
|
||||
createContainerLaunchContextForTest(tokenId);
|
||||
ContainerId newContainerId = BuilderUtils.newContainerId(BuilderUtils
|
||||
.newApplicationAttemptId(tokenId.getContainerID()
|
||||
.getApplicationAttemptId().getApplicationId(), 1), 42);
|
||||
ContainerId oldContainerId = container.getId();
|
||||
try {
|
||||
container.setId(newContainerId);
|
||||
request.setContainer(container);
|
||||
request.setContainerLaunchContext(context);
|
||||
client.startContainer(request);
|
||||
fail("Connection initiation with unauthorized "
|
||||
+ "access is expected to fail.");
|
||||
} catch (YarnRemoteException e) {
|
||||
LOG.info("Got exception : ", e);
|
||||
Assert.assertTrue(e.getMessage().contains(
|
||||
"Unauthorized request to start container. "
|
||||
+ "\nExpected containerId: " + tokenId.getContainerID()
|
||||
+ " Found: " + newContainerId.toString()));
|
||||
} catch (IOException e) {
|
||||
LOG.info("Got IOException: ",e);
|
||||
fail("IOException is not expected.");
|
||||
} finally {
|
||||
container.setId(oldContainerId);
|
||||
}
|
||||
}
|
||||
|
||||
void callWithIllegalResource(ContainerManager client,
|
||||
ContainerTokenIdentifier tokenId, Container container) {
|
||||
StartContainerRequest request = recordFactory
|
||||
.newRecordInstance(StartContainerRequest.class);
|
||||
// Authenticated but unauthorized, due to wrong resource
|
||||
ContainerLaunchContext context =
|
||||
createContainerLaunchContextForTest(tokenId);
|
||||
Resource rsrc = container.getResource();
|
||||
container.setResource(BuilderUtils.newResource(2048, 1));
|
||||
request.setContainerLaunchContext(context);
|
||||
request.setContainer(container);
|
||||
try {
|
||||
client.startContainer(request);
|
||||
fail("Connection initiation with unauthorized "
|
||||
+ "access is expected to fail.");
|
||||
} catch (YarnRemoteException e) {
|
||||
LOG.info("Got exception : ", e);
|
||||
Assert.assertTrue(e.getMessage().contains(
|
||||
"Unauthorized request to start container. "));
|
||||
Assert.assertTrue(e.getMessage().contains(
|
||||
"\nExpected resource " + tokenId.getResource().toString()
|
||||
+ " but found " + container.getResource().toString()));
|
||||
} catch (IOException e) {
|
||||
LOG.info("Got IOException: ",e);
|
||||
fail("IOException is not expected.");
|
||||
}
|
||||
container.setResource(rsrc);
|
||||
}
|
||||
|
||||
void callWithIllegalUserName(ContainerManager client,
|
||||
ContainerTokenIdentifier tokenId, Container container) {
|
||||
StartContainerRequest request = recordFactory
|
||||
.newRecordInstance(StartContainerRequest.class);
|
||||
// Authenticated but unauthorized, due to wrong resource
|
||||
ContainerLaunchContext context =
|
||||
createContainerLaunchContextForTest(tokenId);
|
||||
String user = "invalidUser";
|
||||
request.setContainerLaunchContext(context);
|
||||
request.setContainer(container);
|
||||
try {
|
||||
client.startContainer(request);
|
||||
fail("Connection initiation with unauthorized "
|
||||
+ "access is expected to fail.");
|
||||
} catch (YarnRemoteException e) {
|
||||
LOG.info("Got exception : ", e);
|
||||
Assert.assertTrue(e.getMessage().contains(
|
||||
"Unauthorized request to start container. "));
|
||||
Assert.assertTrue(e.getMessage().contains(
|
||||
"Expected user-name " + tokenId.getApplicationSubmitter()
|
||||
+ " but found " + user));
|
||||
} catch (IOException e) {
|
||||
LOG.info("Got IOException: ",e);
|
||||
fail("IOException is not expected.");
|
||||
}
|
||||
}
|
||||
|
||||
private ContainerLaunchContext createContainerLaunchContextForTest(
|
||||
ContainerTokenIdentifier tokenId) {
|
||||
ContainerLaunchContext context =
|
||||
|
|
Loading…
Reference in New Issue