YARN-561. Modified NodeManager to set key information into the environment of every container that it launches. Contributed by Xuan Gong.

MAPREDUCE-5175. Updated MR App to not set envs that will be set by NMs anyways after YARN-561. Contributed by Xuan Gong.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1471156 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-04-23 21:39:51 +00:00
parent 631be95398
commit 27e8c86999
35 changed files with 303 additions and 233 deletions

View File

@ -206,6 +206,9 @@ Release 2.0.5-beta - UNRELEASED
instead of extracting and populating information itself to start any instead of extracting and populating information itself to start any
container. (vinodkv) container. (vinodkv)
MAPREDUCE-5175. Updated MR App to not set envs that will be set by NMs
anyways after YARN-561. (Xuan Gong via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
MAPREDUCE-4974. Optimising the LineRecordReader initialize() method MAPREDUCE-4974. Optimising the LineRecordReader initialize() method

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.AbstractService;
/** /**
@ -280,7 +281,7 @@ public class LocalContainerLauncher extends AbstractService implements
// Use the AM's local dir env to generate the intermediate step // Use the AM's local dir env to generate the intermediate step
// output files // output files
String[] localSysDirs = StringUtils.getTrimmedStrings( String[] localSysDirs = StringUtils.getTrimmedStrings(
System.getenv(ApplicationConstants.LOCAL_DIR_ENV)); System.getenv(Environment.LOCAL_DIRS.name()));
conf.setStrings(MRConfig.LOCAL_DIR, localSysDirs); conf.setStrings(MRConfig.LOCAL_DIR, localSysDirs);
LOG.info(MRConfig.LOCAL_DIR + " for uber task: " LOG.info(MRConfig.LOCAL_DIR + " for uber task: "
+ conf.get(MRConfig.LOCAL_DIR)); + conf.get(MRConfig.LOCAL_DIR));

View File

@ -111,8 +111,6 @@ public class MapReduceChildJVM {
MRJobConfig.STDERR_LOGFILE_ENV, MRJobConfig.STDERR_LOGFILE_ENV,
getTaskLogFile(TaskLog.LogName.STDERR) getTaskLogFile(TaskLog.LogName.STDERR)
); );
environment.put(MRJobConfig.APPLICATION_ATTEMPT_ID_ENV,
conf.get(MRJobConfig.APPLICATION_ATTEMPT_ID).toString());
} }
private static String getChildJavaOpts(JobConf jobConf, boolean isMapTask) { private static String getChildJavaOpts(JobConf jobConf, boolean isMapTask) {

View File

@ -57,6 +57,9 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
/** /**
@ -216,7 +219,7 @@ class YarnChild {
*/ */
private static void configureLocalDirs(Task task, JobConf job) throws IOException { private static void configureLocalDirs(Task task, JobConf job) throws IOException {
String[] localSysDirs = StringUtils.getTrimmedStrings( String[] localSysDirs = StringUtils.getTrimmedStrings(
System.getenv(ApplicationConstants.LOCAL_DIR_ENV)); System.getenv(Environment.LOCAL_DIRS.name()));
job.setStrings(MRConfig.LOCAL_DIR, localSysDirs); job.setStrings(MRConfig.LOCAL_DIR, localSysDirs);
LOG.info(MRConfig.LOCAL_DIR + " for child: " + job.get(MRConfig.LOCAL_DIR)); LOG.info(MRConfig.LOCAL_DIR + " for child: " + job.get(MRConfig.LOCAL_DIR));
LocalDirAllocator lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR); LocalDirAllocator lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
@ -256,12 +259,14 @@ class YarnChild {
final JobConf job = new JobConf(MRJobConfig.JOB_CONF_FILE); final JobConf job = new JobConf(MRJobConfig.JOB_CONF_FILE);
job.setCredentials(credentials); job.setCredentials(credentials);
String appAttemptIdEnv = System ApplicationAttemptId appAttemptId =
.getenv(MRJobConfig.APPLICATION_ATTEMPT_ID_ENV); ConverterUtils.toContainerId(
LOG.debug("APPLICATION_ATTEMPT_ID: " + appAttemptIdEnv); System.getenv(Environment.CONTAINER_ID.name()))
.getApplicationAttemptId();
LOG.debug("APPLICATION_ATTEMPT_ID: " + appAttemptId);
// Set it in conf, so as to be able to be used the the OutputCommitter. // Set it in conf, so as to be able to be used the the OutputCommitter.
job.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, Integer job.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
.parseInt(appAttemptIdEnv)); appAttemptId.getAttemptId());
// set tcp nodelay // set tcp nodelay
job.setBoolean("ipc.client.tcpnodelay", true); job.setBoolean("ipc.client.tcpnodelay", true);

View File

@ -116,6 +116,7 @@ import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
@ -1270,22 +1271,22 @@ public class MRAppMaster extends CompositeService {
try { try {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
String containerIdStr = String containerIdStr =
System.getenv(ApplicationConstants.AM_CONTAINER_ID_ENV); System.getenv(Environment.CONTAINER_ID.name());
String nodeHostString = System.getenv(ApplicationConstants.NM_HOST_ENV); String nodeHostString = System.getenv(Environment.NM_HOST.name());
String nodePortString = System.getenv(ApplicationConstants.NM_PORT_ENV); String nodePortString = System.getenv(Environment.NM_PORT.name());
String nodeHttpPortString = String nodeHttpPortString =
System.getenv(ApplicationConstants.NM_HTTP_PORT_ENV); System.getenv(Environment.NM_HTTP_PORT.name());
String appSubmitTimeStr = String appSubmitTimeStr =
System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV); System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
String maxAppAttempts = String maxAppAttempts =
System.getenv(ApplicationConstants.MAX_APP_ATTEMPTS_ENV); System.getenv(ApplicationConstants.MAX_APP_ATTEMPTS_ENV);
validateInputParam(containerIdStr, validateInputParam(containerIdStr,
ApplicationConstants.AM_CONTAINER_ID_ENV); Environment.CONTAINER_ID.name());
validateInputParam(nodeHostString, ApplicationConstants.NM_HOST_ENV); validateInputParam(nodeHostString, Environment.NM_HOST.name());
validateInputParam(nodePortString, ApplicationConstants.NM_PORT_ENV); validateInputParam(nodePortString, Environment.NM_PORT.name());
validateInputParam(nodeHttpPortString, validateInputParam(nodeHttpPortString,
ApplicationConstants.NM_HTTP_PORT_ENV); Environment.NM_HTTP_PORT.name());
validateInputParam(appSubmitTimeStr, validateInputParam(appSubmitTimeStr,
ApplicationConstants.APP_SUBMIT_TIME_ENV); ApplicationConstants.APP_SUBMIT_TIME_ENV);
validateInputParam(maxAppAttempts, validateInputParam(maxAppAttempts,

View File

@ -569,8 +569,6 @@ public interface MRJobConfig {
public static final String STDOUT_LOGFILE_ENV = "STDOUT_LOGFILE_ENV"; public static final String STDOUT_LOGFILE_ENV = "STDOUT_LOGFILE_ENV";
public static final String STDERR_LOGFILE_ENV = "STDERR_LOGFILE_ENV"; public static final String STDERR_LOGFILE_ENV = "STDERR_LOGFILE_ENV";
public static final String APPLICATION_ATTEMPT_ID_ENV = "APPLICATION_ATTEMPT_ID_ENV";
// This should be the directory where splits file gets localized on the node // This should be the directory where splits file gets localized on the node
// running ApplicationMaster. // running ApplicationMaster.
public static final String JOB_SUBMIT_DIR = "jobSubmitDir"; public static final String JOB_SUBMIT_DIR = "jobSubmitDir";

View File

@ -95,6 +95,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-441. Removed unused utility methods for collections from two API YARN-441. Removed unused utility methods for collections from two API
records. (Xuan Gong via vinodkv) records. (Xuan Gong via vinodkv)
YARN-561. Modified NodeManager to set key information into the environment
of every container that it launches. (Xuan Gong via vinodkv)
NEW FEATURES NEW FEATURES
YARN-482. FS: Extend SchedulingMode to intermediate queues. YARN-482. FS: Extend SchedulingMode to intermediate queues.

View File

@ -37,30 +37,6 @@ public interface ApplicationConstants {
public static final String APPLICATION_CLIENT_SECRET_ENV_NAME = public static final String APPLICATION_CLIENT_SECRET_ENV_NAME =
"AppClientSecretEnv"; "AppClientSecretEnv";
/**
* The environment variable for CONTAINER_ID. Set in AppMaster environment
* only
*/
public static final String AM_CONTAINER_ID_ENV = "AM_CONTAINER_ID";
/**
* The environment variable for the NM_HOST. Set in the AppMaster environment
* only
*/
public static final String NM_HOST_ENV = "NM_HOST";
/**
* The environment variable for the NM_PORT. Set in the AppMaster environment
* only
*/
public static final String NM_PORT_ENV = "NM_PORT";
/**
* The environment variable for the NM_HTTP_PORT. Set in the AppMaster environment
* only
*/
public static final String NM_HTTP_PORT_ENV = "NM_HTTP_PORT";
/** /**
* The environment variable for APP_SUBMIT_TIME. Set in AppMaster environment * The environment variable for APP_SUBMIT_TIME. Set in AppMaster environment
* only * only
@ -70,8 +46,6 @@ public interface ApplicationConstants {
public static final String CONTAINER_TOKEN_FILE_ENV_NAME = public static final String CONTAINER_TOKEN_FILE_ENV_NAME =
UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION; UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
public static final String LOCAL_DIR_ENV = "YARN_LOCAL_DIRS";
/** /**
* The environmental variable for APPLICATION_WEB_PROXY_BASE. Set in * The environmental variable for APPLICATION_WEB_PROXY_BASE. Set in
* ApplicationMaster's environment only. This states that for all non-relative * ApplicationMaster's environment only. This states that for all non-relative
@ -177,7 +151,37 @@ public interface ApplicationConstants {
/** /**
* $HADOOP_YARN_HOME * $HADOOP_YARN_HOME
*/ */
HADOOP_YARN_HOME("HADOOP_YARN_HOME"); HADOOP_YARN_HOME("HADOOP_YARN_HOME"),
/**
* $CONTAINER_ID
* Final, exported by NodeManager and non-modifiable by users.
*/
CONTAINER_ID("CONTAINER_ID"),
/**
* $NM_HOST
* Final, exported by NodeManager and non-modifiable by users.
*/
NM_HOST("NM_HOST"),
/**
* $NM_HTTP_PORT
* Final, exported by NodeManager and non-modifiable by users.
*/
NM_HTTP_PORT("NM_HTTP_PORT"),
/**
* $NM_PORT
* Final, exported by NodeManager and non-modifiable by users.
*/
NM_PORT("NM_PORT"),
/**
* $LOCAL_DIRS
* Final, exported by NodeManager and non-modifiable by users.
*/
LOCAL_DIRS("LOCAL_DIRS");
private final String variable; private final String variable;
private Environment(String variable) { private Environment(String variable) {

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.AMRMProtocol; import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.ContainerExitStatus; import org.apache.hadoop.yarn.api.ContainerExitStatus;
import org.apache.hadoop.yarn.api.ContainerManager; import org.apache.hadoop.yarn.api.ContainerManager;
@ -320,7 +321,7 @@ public class ApplicationMaster {
Map<String, String> envs = System.getenv(); Map<String, String> envs = System.getenv();
if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) { if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
if (cliParser.hasOption("app_attempt_id")) { if (cliParser.hasOption("app_attempt_id")) {
String appIdStr = cliParser.getOptionValue("app_attempt_id", ""); String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr); appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
@ -330,7 +331,7 @@ public class ApplicationMaster {
} }
} else { } else {
ContainerId containerId = ConverterUtils.toContainerId(envs ContainerId containerId = ConverterUtils.toContainerId(envs
.get(ApplicationConstants.AM_CONTAINER_ID_ENV)); .get(Environment.CONTAINER_ID.name()));
appAttemptID = containerId.getApplicationAttemptId(); appAttemptID = containerId.getApplicationAttemptId();
} }
@ -338,16 +339,16 @@ public class ApplicationMaster {
throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV
+ " not set in the environment"); + " not set in the environment");
} }
if (!envs.containsKey(ApplicationConstants.NM_HOST_ENV)) { if (!envs.containsKey(Environment.NM_HOST.name())) {
throw new RuntimeException(ApplicationConstants.NM_HOST_ENV throw new RuntimeException(Environment.NM_HOST.name()
+ " not set in the environment"); + " not set in the environment");
} }
if (!envs.containsKey(ApplicationConstants.NM_HTTP_PORT_ENV)) { if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) {
throw new RuntimeException(ApplicationConstants.NM_HTTP_PORT_ENV throw new RuntimeException(Environment.NM_HTTP_PORT
+ " not set in the environment"); + " not set in the environment");
} }
if (!envs.containsKey(ApplicationConstants.NM_PORT_ENV)) { if (!envs.containsKey(Environment.NM_PORT.name())) {
throw new RuntimeException(ApplicationConstants.NM_PORT_ENV throw new RuntimeException(Environment.NM_PORT.name()
+ " not set in the environment"); + " not set in the environment");
} }

View File

@ -37,6 +37,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -58,7 +59,7 @@ import org.apache.hadoop.yarn.util.Records;
* creates a new application on the RM and negotiates a new attempt id. Then it * creates a new application on the RM and negotiates a new attempt id. Then it
* waits for the RM app state to reach be YarnApplicationState.ACCEPTED after * waits for the RM app state to reach be YarnApplicationState.ACCEPTED after
* which it spawns the AM in another process and passes it the container id via * which it spawns the AM in another process and passes it the container id via
* env variable ApplicationConstants.AM_CONTAINER_ID_ENV. The AM can be in any * env variable Environment.CONTAINER_ID. The AM can be in any
* language. The AM can register with the RM using the attempt id obtained * language. The AM can register with the RM using the attempt id obtained
* from the container id and proceed as normal. * from the container id and proceed as normal.
* The client redirects app stdout and stderr to its own stdout and * The client redirects app stdout and stderr to its own stdout and
@ -190,10 +191,11 @@ public class UnmanagedAMLauncher {
containerId.setId(0); containerId.setId(0);
String hostname = InetAddress.getLocalHost().getHostName(); String hostname = InetAddress.getLocalHost().getHostName();
envAMList.add(ApplicationConstants.AM_CONTAINER_ID_ENV + "=" + containerId); envAMList.add(Environment.CONTAINER_ID.name() + "=" + containerId);
envAMList.add(ApplicationConstants.NM_HOST_ENV + "=" + hostname); envAMList.add(Environment.NM_HOST.name() + "=" + hostname);
envAMList.add(ApplicationConstants.NM_HTTP_PORT_ENV + "=0"); envAMList.add(Environment.NM_HTTP_PORT.name() + "=0");
envAMList.add(ApplicationConstants.NM_PORT_ENV + "=0"); envAMList.add(Environment.NM_PORT.name() + "=0");
envAMList.add(Environment.LOCAL_DIRS.name() + "= /tmp");
envAMList.add(ApplicationConstants.APP_SUBMIT_TIME_ENV + "=" envAMList.add(ApplicationConstants.APP_SUBMIT_TIME_ENV + "="
+ System.currentTimeMillis()); + System.currentTimeMillis());

View File

@ -113,13 +113,13 @@ public class DefaultContainerExecutor extends ContainerExecutor {
List<String> localDirs, List<String> logDirs) throws IOException { List<String> localDirs, List<String> logDirs) throws IOException {
FsPermission dirPerm = new FsPermission(APPDIR_PERM); FsPermission dirPerm = new FsPermission(APPDIR_PERM);
ContainerId containerId = container.getContainerID(); ContainerId containerId = container.getContainer().getId();
// create container dirs on all disks // create container dirs on all disks
String containerIdStr = ConverterUtils.toString(containerId); String containerIdStr = ConverterUtils.toString(containerId);
String appIdStr = String appIdStr =
ConverterUtils.toString( ConverterUtils.toString(
container.getContainerID().getApplicationAttemptId(). containerId.getApplicationAttemptId().
getApplicationId()); getApplicationId());
for (String sLocalDir : localDirs) { for (String sLocalDir : localDirs) {
Path usersdir = new Path(sLocalDir, ContainerLocalizer.USERCACHE); Path usersdir = new Path(sLocalDir, ContainerLocalizer.USERCACHE);

View File

@ -216,11 +216,11 @@ public class LinuxContainerExecutor extends ContainerExecutor {
String user, String appId, Path containerWorkDir, String user, String appId, Path containerWorkDir,
List<String> localDirs, List<String> logDirs) throws IOException { List<String> localDirs, List<String> logDirs) throws IOException {
ContainerId containerId = container.getContainerID(); ContainerId containerId = container.getContainer().getId();
String containerIdStr = ConverterUtils.toString(containerId); String containerIdStr = ConverterUtils.toString(containerId);
resourcesHandler.preExecute(containerId, resourcesHandler.preExecute(containerId,
container.getResource()); container.getContainer().getResource());
String resourcesOptions = resourcesHandler.getResourcesOption( String resourcesOptions = resourcesHandler.getResourcesOption(
containerId); containerId);

View File

@ -36,8 +36,8 @@ public class ApplicationContainerInitEvent extends ApplicationEvent {
final Container container; final Container container;
public ApplicationContainerInitEvent(Container container) { public ApplicationContainerInitEvent(Container container) {
super(container.getContainerID().getApplicationAttemptId().getApplicationId(), super(container.getContainer().getId().getApplicationAttemptId()
ApplicationEventType.INIT_CONTAINER); .getApplicationId(), ApplicationEventType.INIT_CONTAINER);
this.container = container; this.container = container;
} }

View File

@ -274,14 +274,14 @@ public class ApplicationImpl implements Application {
ApplicationContainerInitEvent initEvent = ApplicationContainerInitEvent initEvent =
(ApplicationContainerInitEvent) event; (ApplicationContainerInitEvent) event;
Container container = initEvent.getContainer(); Container container = initEvent.getContainer();
app.containers.put(container.getContainerID(), container); app.containers.put(container.getContainer().getId(), container);
LOG.info("Adding " + container.getContainerID() LOG.info("Adding " + container.getContainer().getId()
+ " to application " + app.toString()); + " to application " + app.toString());
switch (app.getApplicationState()) { switch (app.getApplicationState()) {
case RUNNING: case RUNNING:
app.dispatcher.getEventHandler().handle(new ContainerInitEvent( app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
container.getContainerID())); container.getContainer().getId()));
break; break;
case INITING: case INITING:
case NEW: case NEW:
@ -302,7 +302,7 @@ public class ApplicationImpl implements Application {
// Start all the containers waiting for ApplicationInit // Start all the containers waiting for ApplicationInit
for (Container container : app.containers.values()) { for (Container container : app.containers.values()) {
app.dispatcher.getEventHandler().handle(new ContainerInitEvent( app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
container.getContainerID())); container.getContainer().getId()));
} }
} }
} }

View File

@ -25,12 +25,11 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
public interface Container extends EventHandler<ContainerEvent> { public interface Container extends EventHandler<ContainerEvent> {
org.apache.hadoop.yarn.api.records.ContainerId getContainerID(); org.apache.hadoop.yarn.api.records.Container getContainer();
String getUser(); String getUser();
@ -46,5 +45,4 @@ public interface Container extends EventHandler<ContainerEvent> {
String toString(); String toString();
Resource getResource();
} }

View File

@ -42,7 +42,6 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
@ -312,16 +311,6 @@ public class ContainerImpl implements Container {
} }
} }
@Override
public ContainerId getContainerID() {
this.readLock.lock();
try {
return this.container.getId();
} finally {
this.readLock.unlock();
}
}
@Override @Override
public String getUser() { public String getUser() {
this.readLock.lock(); this.readLock.lock();
@ -385,10 +374,10 @@ public class ContainerImpl implements Container {
} }
@Override @Override
public Resource getResource() { public org.apache.hadoop.yarn.api.records.Container getContainer() {
this.readLock.lock(); this.readLock.lock();
try { try {
return this.container.getResource(); return this.container;
} finally { } finally {
this.readLock.unlock(); this.readLock.unlock();
} }

View File

@ -118,7 +118,7 @@ public class ContainerLaunch implements Callable<Integer> {
final ContainerLaunchContext launchContext = container.getLaunchContext(); final ContainerLaunchContext launchContext = container.getLaunchContext();
final Map<Path,List<String>> localResources = final Map<Path,List<String>> localResources =
container.getLocalizedResources(); container.getLocalizedResources();
ContainerId containerID = container.getContainerID(); ContainerId containerID = container.getContainer().getId();
String containerIdStr = ConverterUtils.toString(containerID); String containerIdStr = ConverterUtils.toString(containerID);
final String user = launchContext.getUser(); final String user = launchContext.getUser();
final List<String> command = launchContext.getCommands(); final List<String> command = launchContext.getCommands();
@ -299,7 +299,7 @@ public class ContainerLaunch implements Callable<Integer> {
* @throws IOException * @throws IOException
*/ */
public void cleanupContainer() throws IOException { public void cleanupContainer() throws IOException {
ContainerId containerId = container.getContainerID(); ContainerId containerId = container.getContainer().getId();
String containerIdStr = ConverterUtils.toString(containerId); String containerIdStr = ConverterUtils.toString(containerId);
LOG.info("Cleaning up container " + containerIdStr); LOG.info("Cleaning up container " + containerIdStr);
@ -370,7 +370,7 @@ public class ContainerLaunch implements Callable<Integer> {
*/ */
private String getContainerPid(Path pidFilePath) throws Exception { private String getContainerPid(Path pidFilePath) throws Exception {
String containerIdStr = String containerIdStr =
ConverterUtils.toString(container.getContainerID()); ConverterUtils.toString(container.getContainer().getId());
String processId = null; String processId = null;
LOG.debug("Accessing pid for container " + containerIdStr LOG.debug("Accessing pid for container " + containerIdStr
+ " from pid file " + pidFilePath); + " from pid file " + pidFilePath);
@ -547,6 +547,21 @@ public class ContainerLaunch implements Callable<Integer> {
* Non-modifiable environment variables * Non-modifiable environment variables
*/ */
environment.put(Environment.CONTAINER_ID.name(), container
.getContainer().getId().toString());
environment.put(Environment.NM_PORT.name(),
String.valueOf(container.getContainer().getNodeId().getPort()));
environment.put(Environment.NM_HOST.name(), container.getContainer()
.getNodeId().getHost());
environment.put(Environment.NM_HTTP_PORT.name(), container.getContainer()
.getNodeHttpAddress().split(":")[1]);
environment.put(Environment.LOCAL_DIRS.name(),
StringUtils.join(",", appDirs));
putEnvIfNotNull(environment, Environment.USER.name(), container.getUser()); putEnvIfNotNull(environment, Environment.USER.name(), container.getUser());
putEnvIfNotNull(environment, putEnvIfNotNull(environment,
@ -566,11 +581,6 @@ public class ContainerLaunch implements Callable<Integer> {
Environment.HADOOP_CONF_DIR.name(), Environment.HADOOP_CONF_DIR.name(),
System.getenv(Environment.HADOOP_CONF_DIR.name()) System.getenv(Environment.HADOOP_CONF_DIR.name())
); );
putEnvIfNotNull(environment,
ApplicationConstants.LOCAL_DIR_ENV,
StringUtils.join(",", appDirs)
);
if (!Shell.WINDOWS) { if (!Shell.WINDOWS) {
environment.put("JVM_PID", "$$"); environment.put("JVM_PID", "$$");

View File

@ -111,7 +111,7 @@ public class ContainersLauncher extends AbstractService
public void handle(ContainersLauncherEvent event) { public void handle(ContainersLauncherEvent event) {
// TODO: ContainersLauncher launches containers one by one!! // TODO: ContainersLauncher launches containers one by one!!
Container container = event.getContainer(); Container container = event.getContainer();
ContainerId containerId = container.getContainerID(); ContainerId containerId = container.getContainer().getId();
switch (event.getType()) { switch (event.getType()) {
case LAUNCH_CONTAINER: case LAUNCH_CONTAINER:
Application app = Application app =

View File

@ -359,13 +359,15 @@ public class ResourceLocalizationService extends CompositeService
ContainerLocalizationRequestEvent rsrcReqs) { ContainerLocalizationRequestEvent rsrcReqs) {
Container c = rsrcReqs.getContainer(); Container c = rsrcReqs.getContainer();
LocalizerContext ctxt = new LocalizerContext( LocalizerContext ctxt = new LocalizerContext(
c.getUser(), c.getContainerID(), c.getCredentials()); c.getUser(), c.getContainer().getId(), c.getCredentials());
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs = Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
rsrcReqs.getRequestedResources(); rsrcReqs.getRequestedResources();
for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e : for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
rsrcs.entrySet()) { rsrcs.entrySet()) {
LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(), LocalResourcesTracker tracker =
c.getContainerID().getApplicationAttemptId().getApplicationId()); getLocalResourcesTracker(e.getKey(), c.getUser(),
c.getContainer().getId().getApplicationAttemptId()
.getApplicationId());
for (LocalResourceRequest req : e.getValue()) { for (LocalResourceRequest req : e.getValue()) {
tracker.handle(new ResourceRequestEvent(req, e.getKey(), ctxt)); tracker.handle(new ResourceRequestEvent(req, e.getKey(), ctxt));
} }
@ -394,19 +396,21 @@ public class ResourceLocalizationService extends CompositeService
for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e : for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
rsrcs.entrySet()) { rsrcs.entrySet()) {
LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(), LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(),
c.getContainerID().getApplicationAttemptId().getApplicationId()); c.getContainer().getId().getApplicationAttemptId()
.getApplicationId());
for (LocalResourceRequest req : e.getValue()) { for (LocalResourceRequest req : e.getValue()) {
tracker.handle(new ResourceReleaseEvent(req, c.getContainerID())); tracker.handle(new ResourceReleaseEvent(req,
c.getContainer().getId()));
} }
} }
String locId = ConverterUtils.toString(c.getContainerID()); String locId = ConverterUtils.toString(c.getContainer().getId());
localizerTracker.cleanupPrivLocalizers(locId); localizerTracker.cleanupPrivLocalizers(locId);
// Delete the container directories // Delete the container directories
String userName = c.getUser(); String userName = c.getUser();
String containerIDStr = c.toString(); String containerIDStr = c.toString();
String appIDStr = ConverterUtils.toString( String appIDStr = ConverterUtils.toString(
c.getContainerID().getApplicationAttemptId().getApplicationId()); c.getContainer().getId().getApplicationAttemptId().getApplicationId());
for (String localDir : dirsHandler.getLocalDirs()) { for (String localDir : dirsHandler.getLocalDirs()) {
// Delete the user-owned container-dir // Delete the user-owned container-dir
@ -425,8 +429,9 @@ public class ResourceLocalizationService extends CompositeService
delService.delete(null, containerSysDir, new Path[] {}); delService.delete(null, containerSysDir, new Path[] {});
} }
dispatcher.getEventHandler().handle(new ContainerEvent(c.getContainerID(), dispatcher.getEventHandler().handle(
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP)); new ContainerEvent(c.getContainer().getId(),
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
} }

View File

@ -60,7 +60,7 @@ public class ContainerInfo {
public ContainerInfo(final Context nmContext, final Container container, public ContainerInfo(final Context nmContext, final Container container,
String requestUri, String pathPrefix) { String requestUri, String pathPrefix) {
this.id = container.getContainerID().toString(); this.id = container.getContainer().getId().toString();
this.nodeId = nmContext.getNodeId().toString(); this.nodeId = nmContext.getNodeId().toString();
ContainerStatus containerData = container.cloneAndGetContainerStatus(); ContainerStatus containerData = container.cloneAndGetContainerStatus();
this.exitCode = containerData.getExitStatus(); this.exitCode = containerData.getExitStatus();
@ -74,7 +74,7 @@ public class ContainerInfo {
} }
this.user = container.getUser(); this.user = container.getUser();
Resource res = container.getResource(); Resource res = container.getContainer().getResource();
if (res != null) { if (res != null) {
this.totalMemoryNeededMB = res.getMemory(); this.totalMemoryNeededMB = res.getMemory();
} }

View File

@ -88,10 +88,10 @@ public class DummyContainerManager extends ContainerManagerImpl {
.getRequestedResources().values()) { .getRequestedResources().values()) {
for (LocalResourceRequest req : rc) { for (LocalResourceRequest req : rc) {
LOG.info("DEBUG: " + req + ":" LOG.info("DEBUG: " + req + ":"
+ rsrcReqs.getContainer().getContainerID()); + rsrcReqs.getContainer().getContainer().getId());
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
new ContainerResourceLocalizedEvent(rsrcReqs.getContainer() new ContainerResourceLocalizedEvent(rsrcReqs.getContainer()
.getContainerID(), req, new Path("file:///local" .getContainer().getId(), req, new Path("file:///local"
+ req.getPath().toUri().getPath()))); + req.getPath().toUri().getPath())));
} }
} }
@ -101,7 +101,7 @@ public class DummyContainerManager extends ContainerManagerImpl {
((ContainerLocalizationEvent) event).getContainer(); ((ContainerLocalizationEvent) event).getContainer();
// TODO: delete the container dir // TODO: delete the container dir
this.dispatcher.getEventHandler().handle( this.dispatcher.getEventHandler().handle(
new ContainerEvent(container.getContainerID(), new ContainerEvent(container.getContainer().getId(),
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP)); ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
break; break;
case DESTROY_APPLICATION_RESOURCES: case DESTROY_APPLICATION_RESOURCES:
@ -130,7 +130,7 @@ public class DummyContainerManager extends ContainerManagerImpl {
@Override @Override
public void handle(ContainersLauncherEvent event) { public void handle(ContainersLauncherEvent event) {
Container container = event.getContainer(); Container container = event.getContainer();
ContainerId containerId = container.getContainerID(); ContainerId containerId = container.getContainer().getId();
switch (event.getType()) { switch (event.getType()) {
case LAUNCH_CONTAINER: case LAUNCH_CONTAINER:
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(

View File

@ -186,7 +186,10 @@ public class TestLinuxContainerExecutor {
ContainerLaunchContext context = mock(ContainerLaunchContext.class); ContainerLaunchContext context = mock(ContainerLaunchContext.class);
HashMap<String, String> env = new HashMap<String,String>(); HashMap<String, String> env = new HashMap<String,String>();
when(container.getContainerID()).thenReturn(cId); org.apache.hadoop.yarn.api.records.Container containerAPI =
mock(org.apache.hadoop.yarn.api.records.Container.class);
when(container.getContainer()).thenReturn(containerAPI);
when(container.getContainer().getId()).thenReturn(cId);
when(container.getLaunchContext()).thenReturn(context); when(container.getLaunchContext()).thenReturn(context);
when(context.getEnvironment()).thenReturn(env); when(context.getEnvironment()).thenReturn(env);

View File

@ -107,7 +107,10 @@ public class TestLinuxContainerExecutorWithMocks {
ContainerLaunchContext context = mock(ContainerLaunchContext.class); ContainerLaunchContext context = mock(ContainerLaunchContext.class);
HashMap<String, String> env = new HashMap<String,String>(); HashMap<String, String> env = new HashMap<String,String>();
when(container.getContainerID()).thenReturn(cId); org.apache.hadoop.yarn.api.records.Container containerAPI =
mock(org.apache.hadoop.yarn.api.records.Container.class);
when(container.getContainer()).thenReturn(containerAPI);
when(container.getContainer().getId()).thenReturn(cId);
when(container.getLaunchContext()).thenReturn(context); when(container.getLaunchContext()).thenReturn(context);
when(cId.toString()).thenReturn(containerId); when(cId.toString()).thenReturn(containerId);
@ -225,7 +228,10 @@ public class TestLinuxContainerExecutorWithMocks {
ContainerLaunchContext context = mock(ContainerLaunchContext.class); ContainerLaunchContext context = mock(ContainerLaunchContext.class);
HashMap<String, String> env = new HashMap<String, String>(); HashMap<String, String> env = new HashMap<String, String>();
when(container.getContainerID()).thenReturn(cId); org.apache.hadoop.yarn.api.records.Container containerAPI =
mock(org.apache.hadoop.yarn.api.records.Container.class);
when(container.getContainer()).thenReturn(containerAPI);
when(container.getContainer().getId()).thenReturn(cId);
when(container.getLaunchContext()).thenReturn(context); when(container.getLaunchContext()).thenReturn(context);
when(cId.toString()).thenReturn(containerId); when(cId.toString()).thenReturn(containerId);

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -178,6 +179,9 @@ public class TestNodeManagerShutdown {
Container mockContainer = mock(Container.class); Container mockContainer = mock(Container.class);
when(mockContainer.getId()).thenReturn(cId); when(mockContainer.getId()).thenReturn(cId);
NodeId nodeId = BuilderUtils.newNodeId("localhost", 1234);
when(mockContainer.getNodeId()).thenReturn(nodeId);
when(mockContainer.getNodeHttpAddress()).thenReturn("localhost:12345");
containerLaunchContext.setUser(user); containerLaunchContext.setUser(user);
URL localResourceUri = URL localResourceUri =

View File

@ -150,6 +150,10 @@ public class TestContainerManager extends BaseContainerManagerTest {
when(mockContainer.getId()).thenReturn(cId); when(mockContainer.getId()).thenReturn(cId);
when(mockContainer.getResource()).thenReturn( when(mockContainer.getResource()).thenReturn(
BuilderUtils.newResource(512, 1)); BuilderUtils.newResource(512, 1));
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
when(mockContainer.getNodeHttpAddress()).thenReturn(
context.getNodeId().getHost() + ":12345");
StartContainerRequest startRequest = StartContainerRequest startRequest =
recordFactory.newRecordInstance(StartContainerRequest.class); recordFactory.newRecordInstance(StartContainerRequest.class);
@ -245,6 +249,10 @@ public class TestContainerManager extends BaseContainerManagerTest {
when(mockContainer.getId()).thenReturn(cId); when(mockContainer.getId()).thenReturn(cId);
when(mockContainer.getResource()).thenReturn( when(mockContainer.getResource()).thenReturn(
BuilderUtils.newResource(100, 1)); // MB BuilderUtils.newResource(100, 1)); // MB
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
when(mockContainer.getNodeHttpAddress()).thenReturn(
context.getNodeId().getHost() + ":12345");
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class); StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext); startRequest.setContainerLaunchContext(containerLaunchContext);
startRequest.setContainer(mockContainer); startRequest.setContainer(mockContainer);
@ -352,7 +360,9 @@ public class TestContainerManager extends BaseContainerManagerTest {
when(mockContainer.getId()).thenReturn(cId); when(mockContainer.getId()).thenReturn(cId);
when(mockContainer.getResource()).thenReturn( when(mockContainer.getResource()).thenReturn(
BuilderUtils.newResource(100, 1)); // MB BuilderUtils.newResource(100, 1)); // MB
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
when(mockContainer.getNodeHttpAddress()).thenReturn(
context.getNodeId().getHost() + ":12345");
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class); StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext); startRequest.setContainerLaunchContext(containerLaunchContext);
startRequest.setContainer(mockContainer); startRequest.setContainer(mockContainer);
@ -444,6 +454,9 @@ public class TestContainerManager extends BaseContainerManagerTest {
when(mockContainer.getId()).thenReturn(cId); when(mockContainer.getId()).thenReturn(cId);
when(mockContainer.getResource()).thenReturn( when(mockContainer.getResource()).thenReturn(
BuilderUtils.newResource(100, 1)); BuilderUtils.newResource(100, 1));
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
when(mockContainer.getNodeHttpAddress()).thenReturn(
context.getNodeId().getHost() + ":12345");
// containerLaunchContext.command = new ArrayList<CharSequence>(); // containerLaunchContext.command = new ArrayList<CharSequence>();

View File

@ -83,7 +83,7 @@ public class TestApplication {
for (int i = 0; i < wa.containers.size(); i++) { for (int i = 0; i < wa.containers.size(); i++) {
verify(wa.containerBus).handle( verify(wa.containerBus).handle(
argThat(new ContainerInitMatcher(wa.containers.get(i) argThat(new ContainerInitMatcher(wa.containers.get(i)
.getContainerID()))); .getContainer().getId())));
} }
} finally { } finally {
if (wa != null) if (wa != null)
@ -108,7 +108,7 @@ public class TestApplication {
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
verify(wa.containerBus).handle( verify(wa.containerBus).handle(
argThat(new ContainerInitMatcher(wa.containers.get(0) argThat(new ContainerInitMatcher(wa.containers.get(0)
.getContainerID()))); .getContainer().getId())));
wa.initContainer(1); wa.initContainer(1);
wa.initContainer(2); wa.initContainer(2);
@ -118,7 +118,7 @@ public class TestApplication {
for (int i = 1; i < wa.containers.size(); i++) { for (int i = 1; i < wa.containers.size(); i++) {
verify(wa.containerBus).handle( verify(wa.containerBus).handle(
argThat(new ContainerInitMatcher(wa.containers.get(i) argThat(new ContainerInitMatcher(wa.containers.get(i)
.getContainerID()))); .getContainer().getId())));
} }
} finally { } finally {
if (wa != null) if (wa != null)
@ -233,7 +233,7 @@ public class TestApplication {
for (int i = 1; i < wa.containers.size(); i++) { for (int i = 1; i < wa.containers.size(); i++) {
verify(wa.containerBus).handle( verify(wa.containerBus).handle(
argThat(new ContainerKillMatcher(wa.containers.get(i) argThat(new ContainerKillMatcher(wa.containers.get(i)
.getContainerID()))); .getContainer().getId())));
} }
wa.containerFinished(1); wa.containerFinished(1);
@ -354,7 +354,7 @@ public class TestApplication {
verify(wa.containerBus).handle( verify(wa.containerBus).handle(
argThat(new ContainerKillMatcher(wa.containers.get(0) argThat(new ContainerKillMatcher(wa.containers.get(0)
.getContainerID()))); .getContainer().getId())));
assertEquals(ApplicationState.FINISHING_CONTAINERS_WAIT, assertEquals(ApplicationState.FINISHING_CONTAINERS_WAIT,
wa.app.getApplicationState()); wa.app.getApplicationState());
@ -487,7 +487,7 @@ public class TestApplication {
public void containerFinished(int containerNum) { public void containerFinished(int containerNum) {
app.handle(new ApplicationContainerFinishedEvent(containers.get( app.handle(new ApplicationContainerFinishedEvent(containers.get(
containerNum).getContainerID())); containerNum).getContainer().getId()));
drainDispatcherEvents(); drainDispatcherEvents();
} }
@ -514,7 +514,10 @@ public class TestApplication {
BuilderUtils.newApplicationAttemptId(appId, 1); BuilderUtils.newApplicationAttemptId(appId, 1);
ContainerId cId = BuilderUtils.newContainerId(appAttemptId, containerId); ContainerId cId = BuilderUtils.newContainerId(appAttemptId, containerId);
Container c = mock(Container.class); Container c = mock(Container.class);
when(c.getContainerID()).thenReturn(cId); org.apache.hadoop.yarn.api.records.Container containerAPI =
mock(org.apache.hadoop.yarn.api.records.Container.class);
when(c.getContainer()).thenReturn(containerAPI);
when(c.getContainer().getId()).thenReturn(cId);
ContainerLaunchContext launchContext = mock(ContainerLaunchContext.class); ContainerLaunchContext launchContext = mock(ContainerLaunchContext.class);
when(c.getLaunchContext()).thenReturn(launchContext); when(c.getLaunchContext()).thenReturn(launchContext);
when(launchContext.getApplicationACLs()).thenReturn( when(launchContext.getApplicationACLs()).thenReturn(

View File

@ -376,7 +376,7 @@ public class TestContainer {
public boolean matches(Object o) { public boolean matches(Object o) {
ContainersLauncherEvent evt = (ContainersLauncherEvent) o; ContainersLauncherEvent evt = (ContainersLauncherEvent) o;
return evt.getType() == ContainersLauncherEventType.LAUNCH_CONTAINER return evt.getType() == ContainersLauncherEventType.LAUNCH_CONTAINER
&& wcf.cId == evt.getContainer().getContainerID(); && wcf.cId == evt.getContainer().getContainer().getId();
} }
}; };
verify(wc.launcherBus).handle(argThat(matchesLaunchReq)); verify(wc.launcherBus).handle(argThat(matchesLaunchReq));
@ -639,7 +639,7 @@ public class TestContainer {
Path p = new Path(cache, rsrc.getKey()); Path p = new Path(cache, rsrc.getKey());
localPaths.put(p, Arrays.asList(rsrc.getKey())); localPaths.put(p, Arrays.asList(rsrc.getKey()));
// rsrc copied to p // rsrc copied to p
c.handle(new ContainerResourceLocalizedEvent(c.getContainerID(), c.handle(new ContainerResourceLocalizedEvent(c.getContainer().getId(),
req, p)); req, p));
} }
drainDispatcherEvents(); drainDispatcherEvents();
@ -662,7 +662,8 @@ public class TestContainer {
LocalResource rsrc = localResources.get(rsrcKey); LocalResource rsrc = localResources.get(rsrcKey);
LocalResourceRequest req = new LocalResourceRequest(rsrc); LocalResourceRequest req = new LocalResourceRequest(rsrc);
Exception e = new Exception("Fake localization error"); Exception e = new Exception("Fake localization error");
c.handle(new ContainerResourceFailedEvent(c.getContainerID(), req, e)); c.handle(new ContainerResourceFailedEvent(c.getContainer()
.getId(), req, e));
drainDispatcherEvents(); drainDispatcherEvents();
} }
@ -677,7 +678,7 @@ public class TestContainer {
++counter; ++counter;
LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue()); LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue());
Exception e = new Exception("Fake localization error"); Exception e = new Exception("Fake localization error");
c.handle(new ContainerResourceFailedEvent(c.getContainerID(), c.handle(new ContainerResourceFailedEvent(c.getContainer().getId(),
req, e)); req, e));
} }
drainDispatcherEvents(); drainDispatcherEvents();

View File

@ -37,6 +37,7 @@ import java.util.Map;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@ -51,14 +52,13 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
@ -150,50 +150,17 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
} }
} }
} }
// this is a dirty hack - but should be ok for a unittest.
@SuppressWarnings({ "rawtypes", "unchecked" })
public static void setNewEnvironmentHack(Map<String, String> newenv) throws Exception {
try {
Class<?> cl = Class.forName("java.lang.ProcessEnvironment");
Field field = cl.getDeclaredField("theEnvironment");
field.setAccessible(true);
Map<String, String> env = (Map<String, String>)field.get(null);
env.clear();
env.putAll(newenv);
Field ciField = cl.getDeclaredField("theCaseInsensitiveEnvironment");
ciField.setAccessible(true);
Map<String, String> cienv = (Map<String, String>)ciField.get(null);
cienv.clear();
cienv.putAll(newenv);
} catch (NoSuchFieldException e) {
Class[] classes = Collections.class.getDeclaredClasses();
Map<String, String> env = System.getenv();
for (Class cl : classes) {
if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
Field field = cl.getDeclaredField("m");
field.setAccessible(true);
Object obj = field.get(env);
Map<String, String> map = (Map<String, String>) obj;
map.clear();
map.putAll(newenv);
}
}
}
}
/** /**
* See if environment variable is forwarded using sanitizeEnv. * See if environment variable is forwarded using sanitizeEnv.
* @throws Exception * @throws Exception
*/ */
@Test @Test (timeout = 5000)
public void testContainerEnvVariables() throws Exception { public void testContainerEnvVariables() throws Exception {
containerManager.start(); containerManager.start();
Map<String, String> envWithDummy = new HashMap<String, String>(); ContainerLaunchContext containerLaunchContext =
envWithDummy.putAll(System.getenv()); recordFactory.newRecordInstance(ContainerLaunchContext.class);
envWithDummy.put(Environment.MALLOC_ARENA_MAX.name(), "99");
setNewEnvironmentHack(envWithDummy);
Container mockContainer = mock(Container.class); Container mockContainer = mock(Container.class);
// ////// Construct the Container-id // ////// Construct the Container-id
@ -207,34 +174,54 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
ContainerId cId = ContainerId cId =
recordFactory.newRecordInstance(ContainerId.class); recordFactory.newRecordInstance(ContainerId.class);
cId.setApplicationAttemptId(appAttemptId); cId.setApplicationAttemptId(appAttemptId);
String malloc = System.getenv(Environment.MALLOC_ARENA_MAX.name()); when(mockContainer.getId()).thenReturn(cId);
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
when(mockContainer.getNodeHttpAddress()).thenReturn(
context.getNodeId().getHost() + ":12345");
Map<String, String> userSetEnv = new HashMap<String, String>();
userSetEnv.put(Environment.CONTAINER_ID.name(), "user_set_container_id");
userSetEnv.put(Environment.NM_HOST.name(), "user_set_NM_HOST");
userSetEnv.put(Environment.NM_PORT.name(), "user_set_NM_PORT");
userSetEnv.put(Environment.NM_HTTP_PORT.name(), "user_set_NM_HTTP_PORT");
userSetEnv.put(Environment.LOCAL_DIRS.name(), "user_set_LOCAL_DIR");
containerLaunchContext.setUser(user);
containerLaunchContext.setEnvironment(userSetEnv);
File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile"); File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
PrintWriter fileWriter = new PrintWriter(scriptFile); PrintWriter fileWriter = new PrintWriter(scriptFile);
File processStartFile = File processStartFile =
new File(tmpDir, "env_vars.txt").getAbsoluteFile(); new File(tmpDir, "env_vars.txt").getAbsoluteFile();
if (Shell.WINDOWS) { if (Shell.WINDOWS) {
fileWriter.println("@echo " + Environment.MALLOC_ARENA_MAX.$() + "> " + fileWriter.println("@echo " + Environment.CONTAINER_ID.$() + "> "
processStartFile); + processStartFile);
fileWriter.println("@echo " + cId + ">> " + processStartFile); fileWriter.println("@echo " + Environment.NM_HOST.$() + ">> "
+ processStartFile);
fileWriter.println("@echo " + Environment.NM_PORT.$() + ">> "
+ processStartFile);
fileWriter.println("@echo " + Environment.NM_HTTP_PORT.$() + ">> "
+ processStartFile);
fileWriter.println("@echo " + Environment.LOCAL_DIRS.$() + ">> "
+ processStartFile);
fileWriter.println("@ping -n 100 127.0.0.1 >nul"); fileWriter.println("@ping -n 100 127.0.0.1 >nul");
} else { } else {
fileWriter.write("\numask 0"); // So that start file is readable by the test fileWriter.write("\numask 0"); // So that start file is readable by the test
fileWriter.write("\necho " + Environment.MALLOC_ARENA_MAX.$() + " > " + fileWriter.write("\necho $" + Environment.CONTAINER_ID.name() + " > "
processStartFile); + processStartFile);
fileWriter.write("\necho $" + Environment.NM_HOST.name() + " >> "
+ processStartFile);
fileWriter.write("\necho $" + Environment.NM_PORT.name() + " >> "
+ processStartFile);
fileWriter.write("\necho $" + Environment.NM_HTTP_PORT.name() + " >> "
+ processStartFile);
fileWriter.write("\necho $" + Environment.LOCAL_DIRS.name() + " >> "
+ processStartFile);
fileWriter.write("\necho $$ >> " + processStartFile); fileWriter.write("\necho $$ >> " + processStartFile);
fileWriter.write("\nexec sleep 100"); fileWriter.write("\nexec sleep 100");
} }
fileWriter.close(); fileWriter.close();
assert(malloc != null && !"".equals(malloc));
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
when(mockContainer.getId()).thenReturn(cId);
containerLaunchContext.setUser(user);
// upload the script file so that the container can run it // upload the script file so that the container can run it
URL resource_alpha = URL resource_alpha =
ConverterUtils.getYarnUrlFromPath(localFS ConverterUtils.getYarnUrlFromPath(localFS
@ -272,9 +259,40 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
processStartFile.exists()); processStartFile.exists());
// Now verify the contents of the file // Now verify the contents of the file
List<String> localDirs = dirsHandler.getLocalDirs();
List<Path> appDirs = new ArrayList<Path>(localDirs.size());
for (String localDir : localDirs) {
Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
Path userdir = new Path(usersdir, user);
Path appsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
appDirs.add(new Path(appsdir, appId.toString()));
}
BufferedReader reader = BufferedReader reader =
new BufferedReader(new FileReader(processStartFile)); new BufferedReader(new FileReader(processStartFile));
Assert.assertEquals(malloc, reader.readLine()); Assert.assertEquals(cId.toString(), reader.readLine());
Assert.assertEquals(mockContainer.getNodeId().getHost(),
reader.readLine());
Assert.assertEquals(String.valueOf(mockContainer.getNodeId().getPort()),
reader.readLine());
Assert.assertEquals(
String.valueOf(mockContainer.getNodeHttpAddress().split(":")[1]),
reader.readLine());
Assert.assertEquals(StringUtils.join(",", appDirs), reader.readLine());
Assert.assertEquals(cId.toString(), containerLaunchContext
.getEnvironment().get(Environment.CONTAINER_ID.name()));
Assert.assertEquals(mockContainer.getNodeId().getHost(),
containerLaunchContext.getEnvironment()
.get(Environment.NM_HOST.name()));
Assert.assertEquals(String.valueOf(mockContainer.getNodeId().getPort()),
containerLaunchContext.getEnvironment().get(
Environment.NM_PORT.name()));
Assert.assertEquals(
mockContainer.getNodeHttpAddress().split(":")[1],
containerLaunchContext.getEnvironment().get(
Environment.NM_HTTP_PORT.name()));
Assert.assertEquals(StringUtils.join(",", appDirs), containerLaunchContext
.getEnvironment().get(Environment.LOCAL_DIRS.name()));
// Get the pid of the process // Get the pid of the process
String pid = reader.readLine().trim(); String pid = reader.readLine().trim();
// No more lines // No more lines
@ -354,6 +372,9 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
ContainerLaunchContext containerLaunchContext = ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class); recordFactory.newRecordInstance(ContainerLaunchContext.class);
when(mockContainer.getId()).thenReturn(cId); when(mockContainer.getId()).thenReturn(cId);
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
when(mockContainer.getNodeHttpAddress()).thenReturn(
context.getNodeId().getHost() + ":12345");
containerLaunchContext.setUser(user); containerLaunchContext.setUser(user);

View File

@ -494,7 +494,7 @@ public class TestResourceLocalizationService {
Thread.sleep(1000); Thread.sleep(1000);
dispatcher.await(); dispatcher.await();
String appStr = ConverterUtils.toString(appId); String appStr = ConverterUtils.toString(appId);
String ctnrStr = c.getContainerID().toString(); String ctnrStr = c.getContainer().getId().toString();
ArgumentCaptor<Path> tokenPathCaptor = ArgumentCaptor.forClass(Path.class); ArgumentCaptor<Path> tokenPathCaptor = ArgumentCaptor.forClass(Path.class);
verify(exec).startLocalizer(tokenPathCaptor.capture(), verify(exec).startLocalizer(tokenPathCaptor.capture(),
isA(InetSocketAddress.class), eq("user0"), eq(appStr), eq(ctnrStr), isA(InetSocketAddress.class), eq("user0"), eq(appStr), eq(ctnrStr),
@ -570,7 +570,7 @@ public class TestResourceLocalizationService {
public boolean matches(Object o) { public boolean matches(Object o) {
ContainerEvent evt = (ContainerEvent) o; ContainerEvent evt = (ContainerEvent) o;
return evt.getType() == ContainerEventType.RESOURCE_LOCALIZED return evt.getType() == ContainerEventType.RESOURCE_LOCALIZED
&& c.getContainerID() == evt.getContainerID(); && c.getContainer().getId() == evt.getContainerID();
} }
}; };
// total 2 resource localzation calls. one for each resource. // total 2 resource localzation calls. one for each resource.
@ -759,11 +759,11 @@ public class TestResourceLocalizationService {
// Container - 1 // Container - 1
ContainerImpl container1 = createMockContainer(user, 1); ContainerImpl container1 = createMockContainer(user, 1);
String localizerId1 = container1.getContainerID().toString(); String localizerId1 = container1.getContainer().getId().toString();
rls.getPrivateLocalizers().put( rls.getPrivateLocalizers().put(
localizerId1, localizerId1,
rls.new LocalizerRunner(new LocalizerContext(user, container1 rls.new LocalizerRunner(new LocalizerContext(user, container1
.getContainerID(), null), localizerId1)); .getContainer().getId(), null), localizerId1));
LocalizerRunner localizerRunner1 = rls.getLocalizerRunner(localizerId1); LocalizerRunner localizerRunner1 = rls.getLocalizerRunner(localizerId1);
dispatcher1.getEventHandler().handle( dispatcher1.getEventHandler().handle(
@ -774,11 +774,11 @@ public class TestResourceLocalizationService {
// Container - 2 now makes the request. // Container - 2 now makes the request.
ContainerImpl container2 = createMockContainer(user, 2); ContainerImpl container2 = createMockContainer(user, 2);
String localizerId2 = container2.getContainerID().toString(); String localizerId2 = container2.getContainer().getId().toString();
rls.getPrivateLocalizers().put( rls.getPrivateLocalizers().put(
localizerId2, localizerId2,
rls.new LocalizerRunner(new LocalizerContext(user, container2 rls.new LocalizerRunner(new LocalizerContext(user, container2
.getContainerID(), null), localizerId2)); .getContainer().getId(), null), localizerId2));
LocalizerRunner localizerRunner2 = rls.getLocalizerRunner(localizerId2); LocalizerRunner localizerRunner2 = rls.getLocalizerRunner(localizerId2);
dispatcher1.getEventHandler().handle( dispatcher1.getEventHandler().handle(
createContainerLocalizationEvent(container2, createContainerLocalizationEvent(container2,
@ -919,11 +919,11 @@ public class TestResourceLocalizationService {
// Container - 1 // Container - 1
Container container1 = createMockContainer(user, 1); Container container1 = createMockContainer(user, 1);
String localizerId1 = container1.getContainerID().toString(); String localizerId1 = container1.getContainer().getId().toString();
rls.getPrivateLocalizers().put( rls.getPrivateLocalizers().put(
localizerId1, localizerId1,
rls.new LocalizerRunner(new LocalizerContext(user, container1 rls.new LocalizerRunner(new LocalizerContext(user, container1
.getContainerID(), null), localizerId1)); .getContainer().getId(), null), localizerId1));
// Creating two requests for container // Creating two requests for container
// 1) Private resource // 1) Private resource
@ -1314,7 +1314,10 @@ public class TestResourceLocalizationService {
private ContainerImpl createMockContainer(String user, int containerId) { private ContainerImpl createMockContainer(String user, int containerId) {
ContainerImpl container = mock(ContainerImpl.class); ContainerImpl container = mock(ContainerImpl.class);
when(container.getContainerID()).thenReturn( org.apache.hadoop.yarn.api.records.Container c =
mock(org.apache.hadoop.yarn.api.records.Container.class);
when(container.getContainer()).thenReturn(c);
when(container.getContainer().getId()).thenReturn(
BuilderUtils.newContainerId(1, 1, 1, containerId)); BuilderUtils.newContainerId(1, 1, 1, containerId));
when(container.getUser()).thenReturn(user); when(container.getUser()).thenReturn(user);
Credentials mockCredentials = mock(Credentials.class); Credentials mockCredentials = mock(Credentials.class);
@ -1354,8 +1357,11 @@ public class TestResourceLocalizationService {
ApplicationAttemptId appAttemptId = ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(appId, 1); BuilderUtils.newApplicationAttemptId(appId, 1);
ContainerId cId = BuilderUtils.newContainerId(appAttemptId, id); ContainerId cId = BuilderUtils.newContainerId(appAttemptId, id);
org.apache.hadoop.yarn.api.records.Container containerAPI =
mock(org.apache.hadoop.yarn.api.records.Container.class);
when(c.getContainer()).thenReturn(containerAPI);
when(c.getUser()).thenReturn("user0"); when(c.getUser()).thenReturn("user0");
when(c.getContainerID()).thenReturn(cId); when(c.getContainer().getId()).thenReturn(cId);
Credentials creds = new Credentials(); Credentials creds = new Credentials();
creds.addToken(new Text("tok" + id), getToken(id)); creds.addToken(new Text("tok" + id), getToken(id));
when(c.getCredentials()).thenReturn(creds); when(c.getCredentials()).thenReturn(creds);

View File

@ -213,6 +213,9 @@ public class TestContainersMonitor extends BaseContainerManagerTest {
cId.setApplicationAttemptId(appAttemptId); cId.setApplicationAttemptId(appAttemptId);
when(mockContainer.getId()).thenReturn(cId); when(mockContainer.getId()).thenReturn(cId);
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
when(mockContainer.getNodeHttpAddress()).thenReturn(
context.getNodeId().getHost() + ":12345");
containerLaunchContext.setUser(user); containerLaunchContext.setUser(user);
URL resource_alpha = URL resource_alpha =

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.BuilderUtils;
import static org.mockito.Mockito.*;
public class MockContainer implements Container { public class MockContainer implements Container {
@ -48,6 +49,7 @@ public class MockContainer implements Container {
private final Map<Path, List<String>> resource = private final Map<Path, List<String>> resource =
new HashMap<Path, List<String>>(); new HashMap<Path, List<String>>();
private RecordFactory recordFactory; private RecordFactory recordFactory;
private org.apache.hadoop.yarn.api.records.Container mockContainer;
public MockContainer(ApplicationAttemptId appAttemptId, public MockContainer(ApplicationAttemptId appAttemptId,
Dispatcher dispatcher, Configuration conf, String user, Dispatcher dispatcher, Configuration conf, String user,
@ -62,17 +64,14 @@ public class MockContainer implements Container {
launchContext.setUser(user); launchContext.setUser(user);
this.state = ContainerState.NEW; this.state = ContainerState.NEW;
mockContainer = mock(org.apache.hadoop.yarn.api.records.Container.class);
when(mockContainer.getId()).thenReturn(id);
} }
public void setState(ContainerState state) { public void setState(ContainerState state) {
this.state = state; this.state = state;
} }
@Override
public ContainerId getContainerID() {
return id;
}
@Override @Override
public String getUser() { public String getUser() {
return user; return user;
@ -119,8 +118,7 @@ public class MockContainer implements Container {
} }
@Override @Override
public Resource getResource() { public org.apache.hadoop.yarn.api.records.Container getContainer() {
return null; return this.mockContainer;
} }
} }

View File

@ -185,16 +185,18 @@ public class TestNMWebServicesApps extends JerseyTest {
app.getUser(), app.getAppId(), 1); app.getUser(), app.getAppId(), 1);
Container container2 = new MockContainer(appAttemptId, dispatcher, conf, Container container2 = new MockContainer(appAttemptId, dispatcher, conf,
app.getUser(), app.getAppId(), 2); app.getUser(), app.getAppId(), 2);
nmContext.getContainers().put(container1.getContainerID(), container1); nmContext.getContainers()
nmContext.getContainers().put(container2.getContainerID(), container2); .put(container1.getContainer().getId(), container1);
nmContext.getContainers()
.put(container2.getContainer().getId(), container2);
app.getContainers().put(container1.getContainerID(), container1); app.getContainers().put(container1.getContainer().getId(), container1);
app.getContainers().put(container2.getContainerID(), container2); app.getContainers().put(container2.getContainer().getId(), container2);
HashMap<String, String> hash = new HashMap<String, String>(); HashMap<String, String> hash = new HashMap<String, String>();
hash.put(container1.getContainerID().toString(), container1 hash.put(container1.getContainer().getId().toString(), container1
.getContainerID().toString()); .getContainer().getId().toString());
hash.put(container2.getContainerID().toString(), container2 hash.put(container2.getContainer().getId().toString(), container2
.getContainerID().toString()); .getContainer().getId().toString());
return hash; return hash;
} }

View File

@ -186,16 +186,18 @@ public class TestNMWebServicesContainers extends JerseyTest {
app.getUser(), app.getAppId(), 1); app.getUser(), app.getAppId(), 1);
Container container2 = new MockContainer(appAttemptId, dispatcher, conf, Container container2 = new MockContainer(appAttemptId, dispatcher, conf,
app.getUser(), app.getAppId(), 2); app.getUser(), app.getAppId(), 2);
nmContext.getContainers().put(container1.getContainerID(), container1); nmContext.getContainers()
nmContext.getContainers().put(container2.getContainerID(), container2); .put(container1.getContainer().getId(), container1);
nmContext.getContainers()
.put(container2.getContainer().getId(), container2);
app.getContainers().put(container1.getContainerID(), container1); app.getContainers().put(container1.getContainer().getId(), container1);
app.getContainers().put(container2.getContainerID(), container2); app.getContainers().put(container2.getContainer().getId(), container2);
HashMap<String, String> hash = new HashMap<String, String>(); HashMap<String, String> hash = new HashMap<String, String>();
hash.put(container1.getContainerID().toString(), container1 hash.put(container1.getContainer().getId().toString(), container1
.getContainerID().toString()); .getContainer().getId().toString());
hash.put(container2.getContainerID().toString(), container2 hash.put(container2.getContainer().getId().toString(), container2
.getContainerID().toString()); .getContainer().getId().toString());
return hash; return hash;
} }
@ -468,7 +470,7 @@ public class TestNMWebServicesContainers extends JerseyTest {
String state, String user, int exitCode, String diagnostics, String state, String user, int exitCode, String diagnostics,
String nodeId, int totalMemoryNeededMB, String logsLink) String nodeId, int totalMemoryNeededMB, String logsLink)
throws JSONException, Exception { throws JSONException, Exception {
WebServicesTestUtils.checkStringMatch("id", cont.getContainerID() WebServicesTestUtils.checkStringMatch("id", cont.getContainer().getId()
.toString(), id); .toString(), id);
WebServicesTestUtils.checkStringMatch("state", cont.getContainerState() WebServicesTestUtils.checkStringMatch("state", cont.getContainerState()
.toString(), state); .toString(), state);
@ -481,8 +483,9 @@ public class TestNMWebServicesContainers extends JerseyTest {
WebServicesTestUtils.checkStringMatch("nodeId", nmContext.getNodeId() WebServicesTestUtils.checkStringMatch("nodeId", nmContext.getNodeId()
.toString(), nodeId); .toString(), nodeId);
assertEquals("totalMemoryNeededMB wrong", 0, totalMemoryNeededMB); assertEquals("totalMemoryNeededMB wrong", 0, totalMemoryNeededMB);
String shortLink = ujoin("containerlogs", cont.getContainerID().toString(), String shortLink =
cont.getUser()); ujoin("containerlogs", cont.getContainer().getId().toString(),
cont.getUser());
assertTrue("containerLogsLink wrong", logsLink.contains(shortLink)); assertTrue("containerLogsLink wrong", logsLink.contains(shortLink));
} }

View File

@ -178,17 +178,7 @@ public class AMLauncher implements Runnable {
Map<String, String> environment = container.getEnvironment(); Map<String, String> environment = container.getEnvironment();
environment.put(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV, environment.put(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV,
application.getWebProxyBase()); application.getWebProxyBase());
// Set the AppAttemptId, containerId, NMHTTPAdress, AppSubmitTime to be // Set AppSubmitTime and MaxAppAttempts to be consumable by the AM.
// consumable by the AM.
environment.put(ApplicationConstants.AM_CONTAINER_ID_ENV,
containerID.toString());
environment.put(ApplicationConstants.NM_HOST_ENV, masterContainer
.getNodeId().getHost());
environment.put(ApplicationConstants.NM_PORT_ENV,
String.valueOf(masterContainer.getNodeId().getPort()));
String parts[] =
masterContainer.getNodeHttpAddress().split(":");
environment.put(ApplicationConstants.NM_HTTP_PORT_ENV, parts[1]);
ApplicationId applicationId = ApplicationId applicationId =
application.getAppAttemptId().getApplicationId(); application.getAppAttemptId().getApplicationId();
environment.put( environment.put(

View File

@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
@ -71,17 +70,17 @@ public class TestApplicationMasterLauncher {
launched = true; launched = true;
Map<String, String> env = Map<String, String> env =
request.getContainerLaunchContext().getEnvironment(); request.getContainerLaunchContext().getEnvironment();
containerIdAtContainerManager =
env.get(ApplicationConstants.AM_CONTAINER_ID_ENV);
ContainerId containerId = ContainerId containerId =
ConverterUtils.toContainerId(containerIdAtContainerManager); request.getContainer().getId();
containerIdAtContainerManager = containerId.toString();
attemptIdAtContainerManager = attemptIdAtContainerManager =
containerId.getApplicationAttemptId().toString(); containerId.getApplicationAttemptId().toString();
nmHostAtContainerManager = env.get(ApplicationConstants.NM_HOST_ENV); nmHostAtContainerManager = request.getContainer().getNodeId().getHost();
nmPortAtContainerManager = nmPortAtContainerManager =
Integer.parseInt(env.get(ApplicationConstants.NM_PORT_ENV)); request.getContainer().getNodeId().getPort();
nmHttpPortAtContainerManager = nmHttpPortAtContainerManager =
Integer.parseInt(env.get(ApplicationConstants.NM_HTTP_PORT_ENV)); Integer.parseInt(request.getContainer().getNodeHttpAddress()
.split(":")[1]);
submitTimeAtContainerManager = submitTimeAtContainerManager =
Long.parseLong(env.get(ApplicationConstants.APP_SUBMIT_TIME_ENV)); Long.parseLong(env.get(ApplicationConstants.APP_SUBMIT_TIME_ENV));
maxAppAttempts = maxAppAttempts =