YARN-561. Modified NodeManager to set key information into the environment of every container that it launches. Contributed by Xuan Gong.
MAPREDUCE-5175. Updated MR App to not set envs that will be set by NMs anyways after YARN-561. Contributed by Xuan Gong. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1471207 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
700a6aa0a0
commit
2dfecb78e5
|
@ -47,6 +47,9 @@ Release 2.0.5-beta - UNRELEASED
|
|||
instead of extracting and populating information itself to start any
|
||||
container. (vinodkv)
|
||||
|
||||
MAPREDUCE-5175. Updated MR App to not set envs that will be set by NMs
|
||||
anyways after YARN-561. (Xuan Gong via vinodkv)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
MAPREDUCE-4974. Optimising the LineRecordReader initialize() method
|
||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
|
|||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.YarnException;
|
||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
||||
import org.apache.hadoop.yarn.service.AbstractService;
|
||||
|
||||
/**
|
||||
|
@ -280,7 +281,7 @@ public class LocalContainerLauncher extends AbstractService implements
|
|||
// Use the AM's local dir env to generate the intermediate step
|
||||
// output files
|
||||
String[] localSysDirs = StringUtils.getTrimmedStrings(
|
||||
System.getenv(ApplicationConstants.LOCAL_DIR_ENV));
|
||||
System.getenv(Environment.LOCAL_DIRS.name()));
|
||||
conf.setStrings(MRConfig.LOCAL_DIR, localSysDirs);
|
||||
LOG.info(MRConfig.LOCAL_DIR + " for uber task: "
|
||||
+ conf.get(MRConfig.LOCAL_DIR));
|
||||
|
|
|
@ -111,8 +111,6 @@ public class MapReduceChildJVM {
|
|||
MRJobConfig.STDERR_LOGFILE_ENV,
|
||||
getTaskLogFile(TaskLog.LogName.STDERR)
|
||||
);
|
||||
environment.put(MRJobConfig.APPLICATION_ATTEMPT_ID_ENV,
|
||||
conf.get(MRJobConfig.APPLICATION_ATTEMPT_ID).toString());
|
||||
}
|
||||
|
||||
private static String getChildJavaOpts(JobConf jobConf, boolean isMapTask) {
|
||||
|
|
|
@ -57,6 +57,9 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
|||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
|
||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.apache.log4j.LogManager;
|
||||
|
||||
/**
|
||||
|
@ -216,7 +219,7 @@ class YarnChild {
|
|||
*/
|
||||
private static void configureLocalDirs(Task task, JobConf job) throws IOException {
|
||||
String[] localSysDirs = StringUtils.getTrimmedStrings(
|
||||
System.getenv(ApplicationConstants.LOCAL_DIR_ENV));
|
||||
System.getenv(Environment.LOCAL_DIRS.name()));
|
||||
job.setStrings(MRConfig.LOCAL_DIR, localSysDirs);
|
||||
LOG.info(MRConfig.LOCAL_DIR + " for child: " + job.get(MRConfig.LOCAL_DIR));
|
||||
LocalDirAllocator lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
|
||||
|
@ -256,12 +259,14 @@ class YarnChild {
|
|||
final JobConf job = new JobConf(MRJobConfig.JOB_CONF_FILE);
|
||||
job.setCredentials(credentials);
|
||||
|
||||
String appAttemptIdEnv = System
|
||||
.getenv(MRJobConfig.APPLICATION_ATTEMPT_ID_ENV);
|
||||
LOG.debug("APPLICATION_ATTEMPT_ID: " + appAttemptIdEnv);
|
||||
ApplicationAttemptId appAttemptId =
|
||||
ConverterUtils.toContainerId(
|
||||
System.getenv(Environment.CONTAINER_ID.name()))
|
||||
.getApplicationAttemptId();
|
||||
LOG.debug("APPLICATION_ATTEMPT_ID: " + appAttemptId);
|
||||
// Set it in conf, so as to be able to be used the the OutputCommitter.
|
||||
job.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, Integer
|
||||
.parseInt(appAttemptIdEnv));
|
||||
job.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
|
||||
appAttemptId.getAttemptId());
|
||||
|
||||
// set tcp nodelay
|
||||
job.setBoolean("ipc.client.tcpnodelay", true);
|
||||
|
|
|
@ -116,6 +116,7 @@ import org.apache.hadoop.yarn.SystemClock;
|
|||
import org.apache.hadoop.yarn.YarnException;
|
||||
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
|
||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
|
@ -1270,22 +1271,22 @@ public class MRAppMaster extends CompositeService {
|
|||
try {
|
||||
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
|
||||
String containerIdStr =
|
||||
System.getenv(ApplicationConstants.AM_CONTAINER_ID_ENV);
|
||||
String nodeHostString = System.getenv(ApplicationConstants.NM_HOST_ENV);
|
||||
String nodePortString = System.getenv(ApplicationConstants.NM_PORT_ENV);
|
||||
System.getenv(Environment.CONTAINER_ID.name());
|
||||
String nodeHostString = System.getenv(Environment.NM_HOST.name());
|
||||
String nodePortString = System.getenv(Environment.NM_PORT.name());
|
||||
String nodeHttpPortString =
|
||||
System.getenv(ApplicationConstants.NM_HTTP_PORT_ENV);
|
||||
System.getenv(Environment.NM_HTTP_PORT.name());
|
||||
String appSubmitTimeStr =
|
||||
System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
|
||||
String maxAppAttempts =
|
||||
System.getenv(ApplicationConstants.MAX_APP_ATTEMPTS_ENV);
|
||||
|
||||
validateInputParam(containerIdStr,
|
||||
ApplicationConstants.AM_CONTAINER_ID_ENV);
|
||||
validateInputParam(nodeHostString, ApplicationConstants.NM_HOST_ENV);
|
||||
validateInputParam(nodePortString, ApplicationConstants.NM_PORT_ENV);
|
||||
Environment.CONTAINER_ID.name());
|
||||
validateInputParam(nodeHostString, Environment.NM_HOST.name());
|
||||
validateInputParam(nodePortString, Environment.NM_PORT.name());
|
||||
validateInputParam(nodeHttpPortString,
|
||||
ApplicationConstants.NM_HTTP_PORT_ENV);
|
||||
Environment.NM_HTTP_PORT.name());
|
||||
validateInputParam(appSubmitTimeStr,
|
||||
ApplicationConstants.APP_SUBMIT_TIME_ENV);
|
||||
validateInputParam(maxAppAttempts,
|
||||
|
|
|
@ -569,8 +569,6 @@ public interface MRJobConfig {
|
|||
public static final String STDOUT_LOGFILE_ENV = "STDOUT_LOGFILE_ENV";
|
||||
public static final String STDERR_LOGFILE_ENV = "STDERR_LOGFILE_ENV";
|
||||
|
||||
public static final String APPLICATION_ATTEMPT_ID_ENV = "APPLICATION_ATTEMPT_ID_ENV";
|
||||
|
||||
// This should be the directory where splits file gets localized on the node
|
||||
// running ApplicationMaster.
|
||||
public static final String JOB_SUBMIT_DIR = "jobSubmitDir";
|
||||
|
|
|
@ -24,6 +24,9 @@ Release 2.0.5-beta - UNRELEASED
|
|||
YARN-441. Removed unused utility methods for collections from two API
|
||||
records. (Xuan Gong via vinodkv)
|
||||
|
||||
YARN-561. Modified NodeManager to set key information into the environment
|
||||
of every container that it launches. (Xuan Gong via vinodkv)
|
||||
|
||||
NEW FEATURES
|
||||
|
||||
YARN-482. FS: Extend SchedulingMode to intermediate queues.
|
||||
|
|
|
@ -36,30 +36,6 @@ public interface ApplicationConstants {
|
|||
public static final String APPLICATION_CLIENT_SECRET_ENV_NAME =
|
||||
"AppClientSecretEnv";
|
||||
|
||||
/**
|
||||
* The environment variable for CONTAINER_ID. Set in AppMaster environment
|
||||
* only
|
||||
*/
|
||||
public static final String AM_CONTAINER_ID_ENV = "AM_CONTAINER_ID";
|
||||
|
||||
/**
|
||||
* The environment variable for the NM_HOST. Set in the AppMaster environment
|
||||
* only
|
||||
*/
|
||||
public static final String NM_HOST_ENV = "NM_HOST";
|
||||
|
||||
/**
|
||||
* The environment variable for the NM_PORT. Set in the AppMaster environment
|
||||
* only
|
||||
*/
|
||||
public static final String NM_PORT_ENV = "NM_PORT";
|
||||
|
||||
/**
|
||||
* The environment variable for the NM_HTTP_PORT. Set in the AppMaster environment
|
||||
* only
|
||||
*/
|
||||
public static final String NM_HTTP_PORT_ENV = "NM_HTTP_PORT";
|
||||
|
||||
/**
|
||||
* The environment variable for APP_SUBMIT_TIME. Set in AppMaster environment
|
||||
* only
|
||||
|
@ -69,8 +45,6 @@ public interface ApplicationConstants {
|
|||
public static final String CONTAINER_TOKEN_FILE_ENV_NAME =
|
||||
UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
|
||||
|
||||
public static final String LOCAL_DIR_ENV = "YARN_LOCAL_DIRS";
|
||||
|
||||
/**
|
||||
* The environmental variable for APPLICATION_WEB_PROXY_BASE. Set in
|
||||
* ApplicationMaster's environment only. This states that for all non-relative
|
||||
|
@ -176,7 +150,37 @@ public interface ApplicationConstants {
|
|||
/**
|
||||
* $HADOOP_YARN_HOME
|
||||
*/
|
||||
HADOOP_YARN_HOME("HADOOP_YARN_HOME");
|
||||
HADOOP_YARN_HOME("HADOOP_YARN_HOME"),
|
||||
|
||||
/**
|
||||
* $CONTAINER_ID
|
||||
* Final, exported by NodeManager and non-modifiable by users.
|
||||
*/
|
||||
CONTAINER_ID("CONTAINER_ID"),
|
||||
|
||||
/**
|
||||
* $NM_HOST
|
||||
* Final, exported by NodeManager and non-modifiable by users.
|
||||
*/
|
||||
NM_HOST("NM_HOST"),
|
||||
|
||||
/**
|
||||
* $NM_HTTP_PORT
|
||||
* Final, exported by NodeManager and non-modifiable by users.
|
||||
*/
|
||||
NM_HTTP_PORT("NM_HTTP_PORT"),
|
||||
|
||||
/**
|
||||
* $NM_PORT
|
||||
* Final, exported by NodeManager and non-modifiable by users.
|
||||
*/
|
||||
NM_PORT("NM_PORT"),
|
||||
|
||||
/**
|
||||
* $LOCAL_DIRS
|
||||
* Final, exported by NodeManager and non-modifiable by users.
|
||||
*/
|
||||
LOCAL_DIRS("LOCAL_DIRS");
|
||||
|
||||
private final String variable;
|
||||
private Environment(String variable) {
|
||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.yarn.api.AMRMProtocol;
|
||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
||||
import org.apache.hadoop.yarn.api.ContainerExitStatus;
|
||||
import org.apache.hadoop.yarn.api.ContainerManager;
|
||||
|
||||
|
@ -320,7 +321,7 @@ public class ApplicationMaster {
|
|||
|
||||
Map<String, String> envs = System.getenv();
|
||||
|
||||
if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) {
|
||||
if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
|
||||
if (cliParser.hasOption("app_attempt_id")) {
|
||||
String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
|
||||
appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
|
||||
|
@ -330,7 +331,7 @@ public class ApplicationMaster {
|
|||
}
|
||||
} else {
|
||||
ContainerId containerId = ConverterUtils.toContainerId(envs
|
||||
.get(ApplicationConstants.AM_CONTAINER_ID_ENV));
|
||||
.get(Environment.CONTAINER_ID.name()));
|
||||
appAttemptID = containerId.getApplicationAttemptId();
|
||||
}
|
||||
|
||||
|
@ -338,16 +339,16 @@ public class ApplicationMaster {
|
|||
throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV
|
||||
+ " not set in the environment");
|
||||
}
|
||||
if (!envs.containsKey(ApplicationConstants.NM_HOST_ENV)) {
|
||||
throw new RuntimeException(ApplicationConstants.NM_HOST_ENV
|
||||
if (!envs.containsKey(Environment.NM_HOST.name())) {
|
||||
throw new RuntimeException(Environment.NM_HOST.name()
|
||||
+ " not set in the environment");
|
||||
}
|
||||
if (!envs.containsKey(ApplicationConstants.NM_HTTP_PORT_ENV)) {
|
||||
throw new RuntimeException(ApplicationConstants.NM_HTTP_PORT_ENV
|
||||
if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) {
|
||||
throw new RuntimeException(Environment.NM_HTTP_PORT
|
||||
+ " not set in the environment");
|
||||
}
|
||||
if (!envs.containsKey(ApplicationConstants.NM_PORT_ENV)) {
|
||||
throw new RuntimeException(ApplicationConstants.NM_PORT_ENV
|
||||
if (!envs.containsKey(Environment.NM_PORT.name())) {
|
||||
throw new RuntimeException(Environment.NM_PORT.name()
|
||||
+ " not set in the environment");
|
||||
}
|
||||
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
|
@ -58,7 +59,7 @@ import org.apache.hadoop.yarn.util.Records;
|
|||
* creates a new application on the RM and negotiates a new attempt id. Then it
|
||||
* waits for the RM app state to reach be YarnApplicationState.ACCEPTED after
|
||||
* which it spawns the AM in another process and passes it the container id via
|
||||
* env variable ApplicationConstants.AM_CONTAINER_ID_ENV. The AM can be in any
|
||||
* env variable Environment.CONTAINER_ID. The AM can be in any
|
||||
* language. The AM can register with the RM using the attempt id obtained
|
||||
* from the container id and proceed as normal.
|
||||
* The client redirects app stdout and stderr to its own stdout and
|
||||
|
@ -190,10 +191,11 @@ public class UnmanagedAMLauncher {
|
|||
containerId.setId(0);
|
||||
|
||||
String hostname = InetAddress.getLocalHost().getHostName();
|
||||
envAMList.add(ApplicationConstants.AM_CONTAINER_ID_ENV + "=" + containerId);
|
||||
envAMList.add(ApplicationConstants.NM_HOST_ENV + "=" + hostname);
|
||||
envAMList.add(ApplicationConstants.NM_HTTP_PORT_ENV + "=0");
|
||||
envAMList.add(ApplicationConstants.NM_PORT_ENV + "=0");
|
||||
envAMList.add(Environment.CONTAINER_ID.name() + "=" + containerId);
|
||||
envAMList.add(Environment.NM_HOST.name() + "=" + hostname);
|
||||
envAMList.add(Environment.NM_HTTP_PORT.name() + "=0");
|
||||
envAMList.add(Environment.NM_PORT.name() + "=0");
|
||||
envAMList.add(Environment.LOCAL_DIRS.name() + "= /tmp");
|
||||
envAMList.add(ApplicationConstants.APP_SUBMIT_TIME_ENV + "="
|
||||
+ System.currentTimeMillis());
|
||||
|
||||
|
|
|
@ -110,13 +110,13 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
|||
List<String> localDirs, List<String> logDirs) throws IOException {
|
||||
|
||||
FsPermission dirPerm = new FsPermission(APPDIR_PERM);
|
||||
ContainerId containerId = container.getContainerID();
|
||||
ContainerId containerId = container.getContainer().getId();
|
||||
|
||||
// create container dirs on all disks
|
||||
String containerIdStr = ConverterUtils.toString(containerId);
|
||||
String appIdStr =
|
||||
ConverterUtils.toString(
|
||||
container.getContainerID().getApplicationAttemptId().
|
||||
containerId.getApplicationAttemptId().
|
||||
getApplicationId());
|
||||
for (String sLocalDir : localDirs) {
|
||||
Path usersdir = new Path(sLocalDir, ContainerLocalizer.USERCACHE);
|
||||
|
|
|
@ -217,11 +217,11 @@ public class LinuxContainerExecutor extends ContainerExecutor {
|
|||
String user, String appId, Path containerWorkDir,
|
||||
List<String> localDirs, List<String> logDirs) throws IOException {
|
||||
|
||||
ContainerId containerId = container.getContainerID();
|
||||
ContainerId containerId = container.getContainer().getId();
|
||||
String containerIdStr = ConverterUtils.toString(containerId);
|
||||
|
||||
resourcesHandler.preExecute(containerId,
|
||||
container.getResource());
|
||||
container.getContainer().getResource());
|
||||
String resourcesOptions = resourcesHandler.getResourcesOption(
|
||||
containerId);
|
||||
|
||||
|
|
|
@ -36,8 +36,8 @@ public class ApplicationContainerInitEvent extends ApplicationEvent {
|
|||
final Container container;
|
||||
|
||||
public ApplicationContainerInitEvent(Container container) {
|
||||
super(container.getContainerID().getApplicationAttemptId().getApplicationId(),
|
||||
ApplicationEventType.INIT_CONTAINER);
|
||||
super(container.getContainer().getId().getApplicationAttemptId()
|
||||
.getApplicationId(), ApplicationEventType.INIT_CONTAINER);
|
||||
this.container = container;
|
||||
}
|
||||
|
||||
|
|
|
@ -274,14 +274,14 @@ public class ApplicationImpl implements Application {
|
|||
ApplicationContainerInitEvent initEvent =
|
||||
(ApplicationContainerInitEvent) event;
|
||||
Container container = initEvent.getContainer();
|
||||
app.containers.put(container.getContainerID(), container);
|
||||
LOG.info("Adding " + container.getContainerID()
|
||||
app.containers.put(container.getContainer().getId(), container);
|
||||
LOG.info("Adding " + container.getContainer().getId()
|
||||
+ " to application " + app.toString());
|
||||
|
||||
switch (app.getApplicationState()) {
|
||||
case RUNNING:
|
||||
app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
|
||||
container.getContainerID()));
|
||||
container.getContainer().getId()));
|
||||
break;
|
||||
case INITING:
|
||||
case NEW:
|
||||
|
@ -302,7 +302,7 @@ public class ApplicationImpl implements Application {
|
|||
// Start all the containers waiting for ApplicationInit
|
||||
for (Container container : app.containers.values()) {
|
||||
app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
|
||||
container.getContainerID()));
|
||||
container.getContainer().getId()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,12 +25,11 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
|
||||
public interface Container extends EventHandler<ContainerEvent> {
|
||||
|
||||
org.apache.hadoop.yarn.api.records.ContainerId getContainerID();
|
||||
org.apache.hadoop.yarn.api.records.Container getContainer();
|
||||
|
||||
String getUser();
|
||||
|
||||
|
@ -46,5 +45,4 @@ public interface Container extends EventHandler<ContainerEvent> {
|
|||
|
||||
String toString();
|
||||
|
||||
Resource getResource();
|
||||
}
|
||||
|
|
|
@ -42,7 +42,6 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
|
@ -312,16 +311,6 @@ public class ContainerImpl implements Container {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ContainerId getContainerID() {
|
||||
this.readLock.lock();
|
||||
try {
|
||||
return this.container.getId();
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getUser() {
|
||||
this.readLock.lock();
|
||||
|
@ -385,10 +374,10 @@ public class ContainerImpl implements Container {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Resource getResource() {
|
||||
public org.apache.hadoop.yarn.api.records.Container getContainer() {
|
||||
this.readLock.lock();
|
||||
try {
|
||||
return this.container.getResource();
|
||||
return this.container;
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
|
|
|
@ -114,7 +114,7 @@ public class ContainerLaunch implements Callable<Integer> {
|
|||
final ContainerLaunchContext launchContext = container.getLaunchContext();
|
||||
final Map<Path,List<String>> localResources =
|
||||
container.getLocalizedResources();
|
||||
ContainerId containerID = container.getContainerID();
|
||||
ContainerId containerID = container.getContainer().getId();
|
||||
String containerIdStr = ConverterUtils.toString(containerID);
|
||||
final String user = launchContext.getUser();
|
||||
final List<String> command = launchContext.getCommands();
|
||||
|
@ -295,7 +295,7 @@ public class ContainerLaunch implements Callable<Integer> {
|
|||
* @throws IOException
|
||||
*/
|
||||
public void cleanupContainer() throws IOException {
|
||||
ContainerId containerId = container.getContainerID();
|
||||
ContainerId containerId = container.getContainer().getId();
|
||||
String containerIdStr = ConverterUtils.toString(containerId);
|
||||
LOG.info("Cleaning up container " + containerIdStr);
|
||||
|
||||
|
@ -366,7 +366,7 @@ public class ContainerLaunch implements Callable<Integer> {
|
|||
*/
|
||||
private String getContainerPid(Path pidFilePath) throws Exception {
|
||||
String containerIdStr =
|
||||
ConverterUtils.toString(container.getContainerID());
|
||||
ConverterUtils.toString(container.getContainer().getId());
|
||||
String processId = null;
|
||||
LOG.debug("Accessing pid for container " + containerIdStr
|
||||
+ " from pid file " + pidFilePath);
|
||||
|
@ -484,6 +484,21 @@ public class ContainerLaunch implements Callable<Integer> {
|
|||
* Non-modifiable environment variables
|
||||
*/
|
||||
|
||||
environment.put(Environment.CONTAINER_ID.name(), container
|
||||
.getContainer().getId().toString());
|
||||
|
||||
environment.put(Environment.NM_PORT.name(),
|
||||
String.valueOf(container.getContainer().getNodeId().getPort()));
|
||||
|
||||
environment.put(Environment.NM_HOST.name(), container.getContainer()
|
||||
.getNodeId().getHost());
|
||||
|
||||
environment.put(Environment.NM_HTTP_PORT.name(), container.getContainer()
|
||||
.getNodeHttpAddress().split(":")[1]);
|
||||
|
||||
environment.put(Environment.LOCAL_DIRS.name(),
|
||||
StringUtils.join(",", appDirs));
|
||||
|
||||
putEnvIfNotNull(environment, Environment.USER.name(), container.getUser());
|
||||
|
||||
putEnvIfNotNull(environment,
|
||||
|
@ -503,11 +518,6 @@ public class ContainerLaunch implements Callable<Integer> {
|
|||
Environment.HADOOP_CONF_DIR.name(),
|
||||
System.getenv(Environment.HADOOP_CONF_DIR.name())
|
||||
);
|
||||
|
||||
putEnvIfNotNull(environment,
|
||||
ApplicationConstants.LOCAL_DIR_ENV,
|
||||
StringUtils.join(",", appDirs)
|
||||
);
|
||||
|
||||
if (!Shell.WINDOWS) {
|
||||
environment.put("JVM_PID", "$$");
|
||||
|
|
|
@ -111,7 +111,7 @@ public class ContainersLauncher extends AbstractService
|
|||
public void handle(ContainersLauncherEvent event) {
|
||||
// TODO: ContainersLauncher launches containers one by one!!
|
||||
Container container = event.getContainer();
|
||||
ContainerId containerId = container.getContainerID();
|
||||
ContainerId containerId = container.getContainer().getId();
|
||||
switch (event.getType()) {
|
||||
case LAUNCH_CONTAINER:
|
||||
Application app =
|
||||
|
|
|
@ -359,13 +359,15 @@ public class ResourceLocalizationService extends CompositeService
|
|||
ContainerLocalizationRequestEvent rsrcReqs) {
|
||||
Container c = rsrcReqs.getContainer();
|
||||
LocalizerContext ctxt = new LocalizerContext(
|
||||
c.getUser(), c.getContainerID(), c.getCredentials());
|
||||
c.getUser(), c.getContainer().getId(), c.getCredentials());
|
||||
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
|
||||
rsrcReqs.getRequestedResources();
|
||||
for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
|
||||
rsrcs.entrySet()) {
|
||||
LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(),
|
||||
c.getContainerID().getApplicationAttemptId().getApplicationId());
|
||||
LocalResourcesTracker tracker =
|
||||
getLocalResourcesTracker(e.getKey(), c.getUser(),
|
||||
c.getContainer().getId().getApplicationAttemptId()
|
||||
.getApplicationId());
|
||||
for (LocalResourceRequest req : e.getValue()) {
|
||||
tracker.handle(new ResourceRequestEvent(req, e.getKey(), ctxt));
|
||||
}
|
||||
|
@ -394,19 +396,21 @@ public class ResourceLocalizationService extends CompositeService
|
|||
for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
|
||||
rsrcs.entrySet()) {
|
||||
LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(),
|
||||
c.getContainerID().getApplicationAttemptId().getApplicationId());
|
||||
c.getContainer().getId().getApplicationAttemptId()
|
||||
.getApplicationId());
|
||||
for (LocalResourceRequest req : e.getValue()) {
|
||||
tracker.handle(new ResourceReleaseEvent(req, c.getContainerID()));
|
||||
tracker.handle(new ResourceReleaseEvent(req,
|
||||
c.getContainer().getId()));
|
||||
}
|
||||
}
|
||||
String locId = ConverterUtils.toString(c.getContainerID());
|
||||
String locId = ConverterUtils.toString(c.getContainer().getId());
|
||||
localizerTracker.cleanupPrivLocalizers(locId);
|
||||
|
||||
// Delete the container directories
|
||||
String userName = c.getUser();
|
||||
String containerIDStr = c.toString();
|
||||
String appIDStr = ConverterUtils.toString(
|
||||
c.getContainerID().getApplicationAttemptId().getApplicationId());
|
||||
c.getContainer().getId().getApplicationAttemptId().getApplicationId());
|
||||
for (String localDir : dirsHandler.getLocalDirs()) {
|
||||
|
||||
// Delete the user-owned container-dir
|
||||
|
@ -425,8 +429,9 @@ public class ResourceLocalizationService extends CompositeService
|
|||
delService.delete(null, containerSysDir, new Path[] {});
|
||||
}
|
||||
|
||||
dispatcher.getEventHandler().handle(new ContainerEvent(c.getContainerID(),
|
||||
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ContainerEvent(c.getContainer().getId(),
|
||||
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -60,7 +60,7 @@ public class ContainerInfo {
|
|||
public ContainerInfo(final Context nmContext, final Container container,
|
||||
String requestUri, String pathPrefix) {
|
||||
|
||||
this.id = container.getContainerID().toString();
|
||||
this.id = container.getContainer().getId().toString();
|
||||
this.nodeId = nmContext.getNodeId().toString();
|
||||
ContainerStatus containerData = container.cloneAndGetContainerStatus();
|
||||
this.exitCode = containerData.getExitStatus();
|
||||
|
@ -74,7 +74,7 @@ public class ContainerInfo {
|
|||
}
|
||||
|
||||
this.user = container.getUser();
|
||||
Resource res = container.getResource();
|
||||
Resource res = container.getContainer().getResource();
|
||||
if (res != null) {
|
||||
this.totalMemoryNeededMB = res.getMemory();
|
||||
}
|
||||
|
|
|
@ -88,10 +88,10 @@ public class DummyContainerManager extends ContainerManagerImpl {
|
|||
.getRequestedResources().values()) {
|
||||
for (LocalResourceRequest req : rc) {
|
||||
LOG.info("DEBUG: " + req + ":"
|
||||
+ rsrcReqs.getContainer().getContainerID());
|
||||
+ rsrcReqs.getContainer().getContainer().getId());
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ContainerResourceLocalizedEvent(rsrcReqs.getContainer()
|
||||
.getContainerID(), req, new Path("file:///local"
|
||||
.getContainer().getId(), req, new Path("file:///local"
|
||||
+ req.getPath().toUri().getPath())));
|
||||
}
|
||||
}
|
||||
|
@ -101,7 +101,7 @@ public class DummyContainerManager extends ContainerManagerImpl {
|
|||
((ContainerLocalizationEvent) event).getContainer();
|
||||
// TODO: delete the container dir
|
||||
this.dispatcher.getEventHandler().handle(
|
||||
new ContainerEvent(container.getContainerID(),
|
||||
new ContainerEvent(container.getContainer().getId(),
|
||||
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
|
||||
break;
|
||||
case DESTROY_APPLICATION_RESOURCES:
|
||||
|
@ -130,7 +130,7 @@ public class DummyContainerManager extends ContainerManagerImpl {
|
|||
@Override
|
||||
public void handle(ContainersLauncherEvent event) {
|
||||
Container container = event.getContainer();
|
||||
ContainerId containerId = container.getContainerID();
|
||||
ContainerId containerId = container.getContainer().getId();
|
||||
switch (event.getType()) {
|
||||
case LAUNCH_CONTAINER:
|
||||
dispatcher.getEventHandler().handle(
|
||||
|
|
|
@ -186,7 +186,10 @@ public class TestLinuxContainerExecutor {
|
|||
ContainerLaunchContext context = mock(ContainerLaunchContext.class);
|
||||
HashMap<String, String> env = new HashMap<String,String>();
|
||||
|
||||
when(container.getContainerID()).thenReturn(cId);
|
||||
org.apache.hadoop.yarn.api.records.Container containerAPI =
|
||||
mock(org.apache.hadoop.yarn.api.records.Container.class);
|
||||
when(container.getContainer()).thenReturn(containerAPI);
|
||||
when(container.getContainer().getId()).thenReturn(cId);
|
||||
when(container.getLaunchContext()).thenReturn(context);
|
||||
|
||||
when(context.getEnvironment()).thenReturn(env);
|
||||
|
|
|
@ -107,7 +107,10 @@ public class TestLinuxContainerExecutorWithMocks {
|
|||
ContainerLaunchContext context = mock(ContainerLaunchContext.class);
|
||||
HashMap<String, String> env = new HashMap<String,String>();
|
||||
|
||||
when(container.getContainerID()).thenReturn(cId);
|
||||
org.apache.hadoop.yarn.api.records.Container containerAPI =
|
||||
mock(org.apache.hadoop.yarn.api.records.Container.class);
|
||||
when(container.getContainer()).thenReturn(containerAPI);
|
||||
when(container.getContainer().getId()).thenReturn(cId);
|
||||
when(container.getLaunchContext()).thenReturn(context);
|
||||
|
||||
when(cId.toString()).thenReturn(containerId);
|
||||
|
@ -224,7 +227,10 @@ public class TestLinuxContainerExecutorWithMocks {
|
|||
ContainerLaunchContext context = mock(ContainerLaunchContext.class);
|
||||
HashMap<String, String> env = new HashMap<String, String>();
|
||||
|
||||
when(container.getContainerID()).thenReturn(cId);
|
||||
org.apache.hadoop.yarn.api.records.Container containerAPI =
|
||||
mock(org.apache.hadoop.yarn.api.records.Container.class);
|
||||
when(container.getContainer()).thenReturn(containerAPI);
|
||||
when(container.getContainer().getId()).thenReturn(cId);
|
||||
when(container.getLaunchContext()).thenReturn(context);
|
||||
|
||||
when(cId.toString()).thenReturn(containerId);
|
||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.URL;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
@ -166,6 +167,9 @@ public class TestNodeManagerShutdown {
|
|||
ContainerId cId = createContainerId();
|
||||
when(mockContainer.getId()).thenReturn(cId);
|
||||
|
||||
NodeId nodeId = BuilderUtils.newNodeId("localhost", 1234);
|
||||
when(mockContainer.getNodeId()).thenReturn(nodeId);
|
||||
when(mockContainer.getNodeHttpAddress()).thenReturn("localhost:12345");
|
||||
containerLaunchContext.setUser(user);
|
||||
|
||||
URL localResourceUri =
|
||||
|
|
|
@ -148,6 +148,10 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|||
when(mockContainer.getId()).thenReturn(cId);
|
||||
when(mockContainer.getResource()).thenReturn(
|
||||
BuilderUtils.newResource(512, 1));
|
||||
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
|
||||
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
|
||||
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
||||
context.getNodeId().getHost() + ":12345");
|
||||
|
||||
StartContainerRequest startRequest =
|
||||
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
|
@ -238,6 +242,9 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|||
when(mockContainer.getId()).thenReturn(cId);
|
||||
when(mockContainer.getResource()).thenReturn(
|
||||
BuilderUtils.newResource(100 * 1024 * 1024, 1));
|
||||
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
|
||||
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
||||
context.getNodeId().getHost() + ":12345");
|
||||
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
startRequest.setContainer(mockContainer);
|
||||
|
@ -342,6 +349,9 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|||
when(mockContainer.getResource()).thenReturn(
|
||||
BuilderUtils.newResource(100 * 1024 * 1024, 1));
|
||||
|
||||
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
|
||||
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
||||
context.getNodeId().getHost() + ":12345");
|
||||
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
startRequest.setContainer(mockContainer);
|
||||
|
@ -433,6 +443,9 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|||
when(mockContainer.getId()).thenReturn(cId);
|
||||
when(mockContainer.getResource()).thenReturn(
|
||||
BuilderUtils.newResource(100, 1));
|
||||
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
|
||||
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
||||
context.getNodeId().getHost() + ":12345");
|
||||
|
||||
// containerLaunchContext.command = new ArrayList<CharSequence>();
|
||||
|
||||
|
|
|
@ -83,7 +83,7 @@ public class TestApplication {
|
|||
for (int i = 0; i < wa.containers.size(); i++) {
|
||||
verify(wa.containerBus).handle(
|
||||
argThat(new ContainerInitMatcher(wa.containers.get(i)
|
||||
.getContainerID())));
|
||||
.getContainer().getId())));
|
||||
}
|
||||
} finally {
|
||||
if (wa != null)
|
||||
|
@ -108,7 +108,7 @@ public class TestApplication {
|
|||
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
|
||||
verify(wa.containerBus).handle(
|
||||
argThat(new ContainerInitMatcher(wa.containers.get(0)
|
||||
.getContainerID())));
|
||||
.getContainer().getId())));
|
||||
|
||||
wa.initContainer(1);
|
||||
wa.initContainer(2);
|
||||
|
@ -118,7 +118,7 @@ public class TestApplication {
|
|||
for (int i = 1; i < wa.containers.size(); i++) {
|
||||
verify(wa.containerBus).handle(
|
||||
argThat(new ContainerInitMatcher(wa.containers.get(i)
|
||||
.getContainerID())));
|
||||
.getContainer().getId())));
|
||||
}
|
||||
} finally {
|
||||
if (wa != null)
|
||||
|
@ -233,7 +233,7 @@ public class TestApplication {
|
|||
for (int i = 1; i < wa.containers.size(); i++) {
|
||||
verify(wa.containerBus).handle(
|
||||
argThat(new ContainerKillMatcher(wa.containers.get(i)
|
||||
.getContainerID())));
|
||||
.getContainer().getId())));
|
||||
}
|
||||
|
||||
wa.containerFinished(1);
|
||||
|
@ -354,7 +354,7 @@ public class TestApplication {
|
|||
|
||||
verify(wa.containerBus).handle(
|
||||
argThat(new ContainerKillMatcher(wa.containers.get(0)
|
||||
.getContainerID())));
|
||||
.getContainer().getId())));
|
||||
assertEquals(ApplicationState.FINISHING_CONTAINERS_WAIT,
|
||||
wa.app.getApplicationState());
|
||||
|
||||
|
@ -487,7 +487,7 @@ public class TestApplication {
|
|||
|
||||
public void containerFinished(int containerNum) {
|
||||
app.handle(new ApplicationContainerFinishedEvent(containers.get(
|
||||
containerNum).getContainerID()));
|
||||
containerNum).getContainer().getId()));
|
||||
drainDispatcherEvents();
|
||||
}
|
||||
|
||||
|
@ -514,7 +514,10 @@ public class TestApplication {
|
|||
BuilderUtils.newApplicationAttemptId(appId, 1);
|
||||
ContainerId cId = BuilderUtils.newContainerId(appAttemptId, containerId);
|
||||
Container c = mock(Container.class);
|
||||
when(c.getContainerID()).thenReturn(cId);
|
||||
org.apache.hadoop.yarn.api.records.Container containerAPI =
|
||||
mock(org.apache.hadoop.yarn.api.records.Container.class);
|
||||
when(c.getContainer()).thenReturn(containerAPI);
|
||||
when(c.getContainer().getId()).thenReturn(cId);
|
||||
ContainerLaunchContext launchContext = mock(ContainerLaunchContext.class);
|
||||
when(c.getLaunchContext()).thenReturn(launchContext);
|
||||
when(launchContext.getApplicationACLs()).thenReturn(
|
||||
|
|
|
@ -376,7 +376,7 @@ public class TestContainer {
|
|||
public boolean matches(Object o) {
|
||||
ContainersLauncherEvent evt = (ContainersLauncherEvent) o;
|
||||
return evt.getType() == ContainersLauncherEventType.LAUNCH_CONTAINER
|
||||
&& wcf.cId == evt.getContainer().getContainerID();
|
||||
&& wcf.cId == evt.getContainer().getContainer().getId();
|
||||
}
|
||||
};
|
||||
verify(wc.launcherBus).handle(argThat(matchesLaunchReq));
|
||||
|
@ -639,7 +639,7 @@ public class TestContainer {
|
|||
Path p = new Path(cache, rsrc.getKey());
|
||||
localPaths.put(p, Arrays.asList(rsrc.getKey()));
|
||||
// rsrc copied to p
|
||||
c.handle(new ContainerResourceLocalizedEvent(c.getContainerID(),
|
||||
c.handle(new ContainerResourceLocalizedEvent(c.getContainer().getId(),
|
||||
req, p));
|
||||
}
|
||||
drainDispatcherEvents();
|
||||
|
@ -662,7 +662,8 @@ public class TestContainer {
|
|||
LocalResource rsrc = localResources.get(rsrcKey);
|
||||
LocalResourceRequest req = new LocalResourceRequest(rsrc);
|
||||
Exception e = new Exception("Fake localization error");
|
||||
c.handle(new ContainerResourceFailedEvent(c.getContainerID(), req, e));
|
||||
c.handle(new ContainerResourceFailedEvent(c.getContainer()
|
||||
.getId(), req, e));
|
||||
drainDispatcherEvents();
|
||||
}
|
||||
|
||||
|
@ -677,7 +678,7 @@ public class TestContainer {
|
|||
++counter;
|
||||
LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue());
|
||||
Exception e = new Exception("Fake localization error");
|
||||
c.handle(new ContainerResourceFailedEvent(c.getContainerID(),
|
||||
c.handle(new ContainerResourceFailedEvent(c.getContainer().getId(),
|
||||
req, e));
|
||||
}
|
||||
drainDispatcherEvents();
|
||||
|
|
|
@ -37,6 +37,7 @@ import java.util.Map;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||
import org.apache.hadoop.util.Shell;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||
|
@ -51,13 +52,13 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.URL;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
|
||||
|
@ -141,50 +142,15 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// this is a dirty hack - but should be ok for a unittest.
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
public static void setNewEnvironmentHack(Map<String, String> newenv) throws Exception {
|
||||
Class[] classes = Collections.class.getDeclaredClasses();
|
||||
Map<String, String> env = System.getenv();
|
||||
for (Class cl : classes) {
|
||||
if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
|
||||
Field field = cl.getDeclaredField("m");
|
||||
field.setAccessible(true);
|
||||
Object obj = field.get(env);
|
||||
Map<String, String> map = (Map<String, String>) obj;
|
||||
map.clear();
|
||||
map.putAll(newenv);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* See if environment variable is forwarded using sanitizeEnv.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
@Test (timeout = 5000)
|
||||
public void testContainerEnvVariables() throws Exception {
|
||||
containerManager.start();
|
||||
|
||||
Map<String, String> envWithDummy = new HashMap<String, String>();
|
||||
envWithDummy.putAll(System.getenv());
|
||||
envWithDummy.put(Environment.MALLOC_ARENA_MAX.name(), "99");
|
||||
setNewEnvironmentHack(envWithDummy);
|
||||
|
||||
String malloc = System.getenv(Environment.MALLOC_ARENA_MAX.name());
|
||||
File scriptFile = new File(tmpDir, "scriptFile.sh");
|
||||
PrintWriter fileWriter = new PrintWriter(scriptFile);
|
||||
File processStartFile =
|
||||
new File(tmpDir, "env_vars.txt").getAbsoluteFile();
|
||||
fileWriter.write("\numask 0"); // So that start file is readable by the test
|
||||
fileWriter.write("\necho $" + Environment.MALLOC_ARENA_MAX.name() + " > " + processStartFile);
|
||||
fileWriter.write("\necho $$ >> " + processStartFile);
|
||||
fileWriter.write("\nexec sleep 100");
|
||||
fileWriter.close();
|
||||
|
||||
assert(malloc != null && !"".equals(malloc));
|
||||
|
||||
ContainerLaunchContext containerLaunchContext =
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
|
||||
|
@ -201,8 +167,37 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
|
|||
recordFactory.newRecordInstance(ContainerId.class);
|
||||
cId.setApplicationAttemptId(appAttemptId);
|
||||
when(mockContainer.getId()).thenReturn(cId);
|
||||
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
|
||||
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
||||
context.getNodeId().getHost() + ":12345");
|
||||
|
||||
Map<String, String> userSetEnv = new HashMap<String, String>();
|
||||
userSetEnv.put(Environment.CONTAINER_ID.name(), "user_set_container_id");
|
||||
userSetEnv.put(Environment.NM_HOST.name(), "user_set_NM_HOST");
|
||||
userSetEnv.put(Environment.NM_PORT.name(), "user_set_NM_PORT");
|
||||
userSetEnv.put(Environment.NM_HTTP_PORT.name(), "user_set_NM_HTTP_PORT");
|
||||
userSetEnv.put(Environment.LOCAL_DIRS.name(), "user_set_LOCAL_DIR");
|
||||
containerLaunchContext.setUser(user);
|
||||
containerLaunchContext.setEnvironment(userSetEnv);
|
||||
|
||||
File scriptFile = new File(tmpDir, "scriptFile.sh");
|
||||
PrintWriter fileWriter = new PrintWriter(scriptFile);
|
||||
File processStartFile =
|
||||
new File(tmpDir, "env_vars.txt").getAbsoluteFile();
|
||||
fileWriter.write("\numask 0"); // So that start file is readable by the test
|
||||
fileWriter.write("\necho $" + Environment.CONTAINER_ID.name() + " > "
|
||||
+ processStartFile);
|
||||
fileWriter.write("\necho $" + Environment.NM_HOST.name() + " >> "
|
||||
+ processStartFile);
|
||||
fileWriter.write("\necho $" + Environment.NM_PORT.name() + " >> "
|
||||
+ processStartFile);
|
||||
fileWriter.write("\necho $" + Environment.NM_HTTP_PORT.name() + " >> "
|
||||
+ processStartFile);
|
||||
fileWriter.write("\necho $" + Environment.LOCAL_DIRS.name() + " >> "
|
||||
+ processStartFile);
|
||||
fileWriter.write("\necho $$ >> " + processStartFile);
|
||||
fileWriter.write("\nexec sleep 100");
|
||||
fileWriter.close();
|
||||
|
||||
// upload the script file so that the container can run it
|
||||
URL resource_alpha =
|
||||
|
@ -243,9 +238,40 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
|
|||
processStartFile.exists());
|
||||
|
||||
// Now verify the contents of the file
|
||||
List<String> localDirs = dirsHandler.getLocalDirs();
|
||||
List<Path> appDirs = new ArrayList<Path>(localDirs.size());
|
||||
for (String localDir : localDirs) {
|
||||
Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
|
||||
Path userdir = new Path(usersdir, user);
|
||||
Path appsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
|
||||
appDirs.add(new Path(appsdir, appId.toString()));
|
||||
}
|
||||
BufferedReader reader =
|
||||
new BufferedReader(new FileReader(processStartFile));
|
||||
Assert.assertEquals(malloc, reader.readLine());
|
||||
Assert.assertEquals(cId.toString(), reader.readLine());
|
||||
Assert.assertEquals(mockContainer.getNodeId().getHost(),
|
||||
reader.readLine());
|
||||
Assert.assertEquals(String.valueOf(mockContainer.getNodeId().getPort()),
|
||||
reader.readLine());
|
||||
Assert.assertEquals(
|
||||
String.valueOf(mockContainer.getNodeHttpAddress().split(":")[1]),
|
||||
reader.readLine());
|
||||
Assert.assertEquals(StringUtils.join(",", appDirs), reader.readLine());
|
||||
|
||||
Assert.assertEquals(cId.toString(), containerLaunchContext
|
||||
.getEnvironment().get(Environment.CONTAINER_ID.name()));
|
||||
Assert.assertEquals(mockContainer.getNodeId().getHost(),
|
||||
containerLaunchContext.getEnvironment()
|
||||
.get(Environment.NM_HOST.name()));
|
||||
Assert.assertEquals(String.valueOf(mockContainer.getNodeId().getPort()),
|
||||
containerLaunchContext.getEnvironment().get(
|
||||
Environment.NM_PORT.name()));
|
||||
Assert.assertEquals(
|
||||
mockContainer.getNodeHttpAddress().split(":")[1],
|
||||
containerLaunchContext.getEnvironment().get(
|
||||
Environment.NM_HTTP_PORT.name()));
|
||||
Assert.assertEquals(StringUtils.join(",", appDirs), containerLaunchContext
|
||||
.getEnvironment().get(Environment.LOCAL_DIRS.name()));
|
||||
// Get the pid of the process
|
||||
String pid = reader.readLine().trim();
|
||||
// No more lines
|
||||
|
@ -319,6 +345,9 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
|
|||
recordFactory.newRecordInstance(ContainerId.class);
|
||||
cId.setApplicationAttemptId(appAttemptId);
|
||||
when(mockContainer.getId()).thenReturn(cId);
|
||||
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
|
||||
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
||||
context.getNodeId().getHost() + ":12345");
|
||||
|
||||
containerLaunchContext.setUser(user);
|
||||
|
||||
|
|
|
@ -494,7 +494,7 @@ public class TestResourceLocalizationService {
|
|||
Thread.sleep(1000);
|
||||
dispatcher.await();
|
||||
String appStr = ConverterUtils.toString(appId);
|
||||
String ctnrStr = c.getContainerID().toString();
|
||||
String ctnrStr = c.getContainer().getId().toString();
|
||||
ArgumentCaptor<Path> tokenPathCaptor = ArgumentCaptor.forClass(Path.class);
|
||||
verify(exec).startLocalizer(tokenPathCaptor.capture(),
|
||||
isA(InetSocketAddress.class), eq("user0"), eq(appStr), eq(ctnrStr),
|
||||
|
@ -570,7 +570,7 @@ public class TestResourceLocalizationService {
|
|||
public boolean matches(Object o) {
|
||||
ContainerEvent evt = (ContainerEvent) o;
|
||||
return evt.getType() == ContainerEventType.RESOURCE_LOCALIZED
|
||||
&& c.getContainerID() == evt.getContainerID();
|
||||
&& c.getContainer().getId() == evt.getContainerID();
|
||||
}
|
||||
};
|
||||
// total 2 resource localzation calls. one for each resource.
|
||||
|
@ -759,11 +759,11 @@ public class TestResourceLocalizationService {
|
|||
|
||||
// Container - 1
|
||||
ContainerImpl container1 = createMockContainer(user, 1);
|
||||
String localizerId1 = container1.getContainerID().toString();
|
||||
String localizerId1 = container1.getContainer().getId().toString();
|
||||
rls.getPrivateLocalizers().put(
|
||||
localizerId1,
|
||||
rls.new LocalizerRunner(new LocalizerContext(user, container1
|
||||
.getContainerID(), null), localizerId1));
|
||||
.getContainer().getId(), null), localizerId1));
|
||||
LocalizerRunner localizerRunner1 = rls.getLocalizerRunner(localizerId1);
|
||||
|
||||
dispatcher1.getEventHandler().handle(
|
||||
|
@ -774,11 +774,11 @@ public class TestResourceLocalizationService {
|
|||
|
||||
// Container - 2 now makes the request.
|
||||
ContainerImpl container2 = createMockContainer(user, 2);
|
||||
String localizerId2 = container2.getContainerID().toString();
|
||||
String localizerId2 = container2.getContainer().getId().toString();
|
||||
rls.getPrivateLocalizers().put(
|
||||
localizerId2,
|
||||
rls.new LocalizerRunner(new LocalizerContext(user, container2
|
||||
.getContainerID(), null), localizerId2));
|
||||
.getContainer().getId(), null), localizerId2));
|
||||
LocalizerRunner localizerRunner2 = rls.getLocalizerRunner(localizerId2);
|
||||
dispatcher1.getEventHandler().handle(
|
||||
createContainerLocalizationEvent(container2,
|
||||
|
@ -919,11 +919,11 @@ public class TestResourceLocalizationService {
|
|||
|
||||
// Container - 1
|
||||
Container container1 = createMockContainer(user, 1);
|
||||
String localizerId1 = container1.getContainerID().toString();
|
||||
String localizerId1 = container1.getContainer().getId().toString();
|
||||
rls.getPrivateLocalizers().put(
|
||||
localizerId1,
|
||||
rls.new LocalizerRunner(new LocalizerContext(user, container1
|
||||
.getContainerID(), null), localizerId1));
|
||||
.getContainer().getId(), null), localizerId1));
|
||||
|
||||
// Creating two requests for container
|
||||
// 1) Private resource
|
||||
|
@ -1314,7 +1314,10 @@ public class TestResourceLocalizationService {
|
|||
|
||||
private ContainerImpl createMockContainer(String user, int containerId) {
|
||||
ContainerImpl container = mock(ContainerImpl.class);
|
||||
when(container.getContainerID()).thenReturn(
|
||||
org.apache.hadoop.yarn.api.records.Container c =
|
||||
mock(org.apache.hadoop.yarn.api.records.Container.class);
|
||||
when(container.getContainer()).thenReturn(c);
|
||||
when(container.getContainer().getId()).thenReturn(
|
||||
BuilderUtils.newContainerId(1, 1, 1, containerId));
|
||||
when(container.getUser()).thenReturn(user);
|
||||
Credentials mockCredentials = mock(Credentials.class);
|
||||
|
@ -1354,8 +1357,11 @@ public class TestResourceLocalizationService {
|
|||
ApplicationAttemptId appAttemptId =
|
||||
BuilderUtils.newApplicationAttemptId(appId, 1);
|
||||
ContainerId cId = BuilderUtils.newContainerId(appAttemptId, id);
|
||||
org.apache.hadoop.yarn.api.records.Container containerAPI =
|
||||
mock(org.apache.hadoop.yarn.api.records.Container.class);
|
||||
when(c.getContainer()).thenReturn(containerAPI);
|
||||
when(c.getUser()).thenReturn("user0");
|
||||
when(c.getContainerID()).thenReturn(cId);
|
||||
when(c.getContainer().getId()).thenReturn(cId);
|
||||
Credentials creds = new Credentials();
|
||||
creds.addToken(new Text("tok" + id), getToken(id));
|
||||
when(c.getCredentials()).thenReturn(creds);
|
||||
|
|
|
@ -213,6 +213,9 @@ public class TestContainersMonitor extends BaseContainerManagerTest {
|
|||
cId.setApplicationAttemptId(appAttemptId);
|
||||
when(mockContainer.getId()).thenReturn(cId);
|
||||
|
||||
when(mockContainer.getNodeId()).thenReturn(context.getNodeId());
|
||||
when(mockContainer.getNodeHttpAddress()).thenReturn(
|
||||
context.getNodeId().getHost() + ":12345");
|
||||
containerLaunchContext.setUser(user);
|
||||
|
||||
URL resource_alpha =
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
public class MockContainer implements Container {
|
||||
|
||||
|
@ -48,6 +49,7 @@ public class MockContainer implements Container {
|
|||
private final Map<Path, List<String>> resource =
|
||||
new HashMap<Path, List<String>>();
|
||||
private RecordFactory recordFactory;
|
||||
private org.apache.hadoop.yarn.api.records.Container mockContainer;
|
||||
|
||||
public MockContainer(ApplicationAttemptId appAttemptId,
|
||||
Dispatcher dispatcher, Configuration conf, String user,
|
||||
|
@ -62,17 +64,14 @@ public class MockContainer implements Container {
|
|||
launchContext.setUser(user);
|
||||
this.state = ContainerState.NEW;
|
||||
|
||||
mockContainer = mock(org.apache.hadoop.yarn.api.records.Container.class);
|
||||
when(mockContainer.getId()).thenReturn(id);
|
||||
}
|
||||
|
||||
public void setState(ContainerState state) {
|
||||
this.state = state;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ContainerId getContainerID() {
|
||||
return id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getUser() {
|
||||
return user;
|
||||
|
@ -119,8 +118,7 @@ public class MockContainer implements Container {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Resource getResource() {
|
||||
return null;
|
||||
public org.apache.hadoop.yarn.api.records.Container getContainer() {
|
||||
return this.mockContainer;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -185,16 +185,18 @@ public class TestNMWebServicesApps extends JerseyTest {
|
|||
app.getUser(), app.getAppId(), 1);
|
||||
Container container2 = new MockContainer(appAttemptId, dispatcher, conf,
|
||||
app.getUser(), app.getAppId(), 2);
|
||||
nmContext.getContainers().put(container1.getContainerID(), container1);
|
||||
nmContext.getContainers().put(container2.getContainerID(), container2);
|
||||
nmContext.getContainers()
|
||||
.put(container1.getContainer().getId(), container1);
|
||||
nmContext.getContainers()
|
||||
.put(container2.getContainer().getId(), container2);
|
||||
|
||||
app.getContainers().put(container1.getContainerID(), container1);
|
||||
app.getContainers().put(container2.getContainerID(), container2);
|
||||
app.getContainers().put(container1.getContainer().getId(), container1);
|
||||
app.getContainers().put(container2.getContainer().getId(), container2);
|
||||
HashMap<String, String> hash = new HashMap<String, String>();
|
||||
hash.put(container1.getContainerID().toString(), container1
|
||||
.getContainerID().toString());
|
||||
hash.put(container2.getContainerID().toString(), container2
|
||||
.getContainerID().toString());
|
||||
hash.put(container1.getContainer().getId().toString(), container1
|
||||
.getContainer().getId().toString());
|
||||
hash.put(container2.getContainer().getId().toString(), container2
|
||||
.getContainer().getId().toString());
|
||||
return hash;
|
||||
}
|
||||
|
||||
|
|
|
@ -186,16 +186,18 @@ public class TestNMWebServicesContainers extends JerseyTest {
|
|||
app.getUser(), app.getAppId(), 1);
|
||||
Container container2 = new MockContainer(appAttemptId, dispatcher, conf,
|
||||
app.getUser(), app.getAppId(), 2);
|
||||
nmContext.getContainers().put(container1.getContainerID(), container1);
|
||||
nmContext.getContainers().put(container2.getContainerID(), container2);
|
||||
nmContext.getContainers()
|
||||
.put(container1.getContainer().getId(), container1);
|
||||
nmContext.getContainers()
|
||||
.put(container2.getContainer().getId(), container2);
|
||||
|
||||
app.getContainers().put(container1.getContainerID(), container1);
|
||||
app.getContainers().put(container2.getContainerID(), container2);
|
||||
app.getContainers().put(container1.getContainer().getId(), container1);
|
||||
app.getContainers().put(container2.getContainer().getId(), container2);
|
||||
HashMap<String, String> hash = new HashMap<String, String>();
|
||||
hash.put(container1.getContainerID().toString(), container1
|
||||
.getContainerID().toString());
|
||||
hash.put(container2.getContainerID().toString(), container2
|
||||
.getContainerID().toString());
|
||||
hash.put(container1.getContainer().getId().toString(), container1
|
||||
.getContainer().getId().toString());
|
||||
hash.put(container2.getContainer().getId().toString(), container2
|
||||
.getContainer().getId().toString());
|
||||
return hash;
|
||||
}
|
||||
|
||||
|
@ -468,7 +470,7 @@ public class TestNMWebServicesContainers extends JerseyTest {
|
|||
String state, String user, int exitCode, String diagnostics,
|
||||
String nodeId, int totalMemoryNeededMB, String logsLink)
|
||||
throws JSONException, Exception {
|
||||
WebServicesTestUtils.checkStringMatch("id", cont.getContainerID()
|
||||
WebServicesTestUtils.checkStringMatch("id", cont.getContainer().getId()
|
||||
.toString(), id);
|
||||
WebServicesTestUtils.checkStringMatch("state", cont.getContainerState()
|
||||
.toString(), state);
|
||||
|
@ -481,8 +483,9 @@ public class TestNMWebServicesContainers extends JerseyTest {
|
|||
WebServicesTestUtils.checkStringMatch("nodeId", nmContext.getNodeId()
|
||||
.toString(), nodeId);
|
||||
assertEquals("totalMemoryNeededMB wrong", 0, totalMemoryNeededMB);
|
||||
String shortLink = ujoin("containerlogs", cont.getContainerID().toString(),
|
||||
cont.getUser());
|
||||
String shortLink =
|
||||
ujoin("containerlogs", cont.getContainer().getId().toString(),
|
||||
cont.getUser());
|
||||
assertTrue("containerLogsLink wrong", logsLink.contains(shortLink));
|
||||
}
|
||||
|
||||
|
|
|
@ -178,17 +178,7 @@ public class AMLauncher implements Runnable {
|
|||
Map<String, String> environment = container.getEnvironment();
|
||||
environment.put(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV,
|
||||
application.getWebProxyBase());
|
||||
// Set the AppAttemptId, containerId, NMHTTPAdress, AppSubmitTime to be
|
||||
// consumable by the AM.
|
||||
environment.put(ApplicationConstants.AM_CONTAINER_ID_ENV,
|
||||
containerID.toString());
|
||||
environment.put(ApplicationConstants.NM_HOST_ENV, masterContainer
|
||||
.getNodeId().getHost());
|
||||
environment.put(ApplicationConstants.NM_PORT_ENV,
|
||||
String.valueOf(masterContainer.getNodeId().getPort()));
|
||||
String parts[] =
|
||||
masterContainer.getNodeHttpAddress().split(":");
|
||||
environment.put(ApplicationConstants.NM_HTTP_PORT_ENV, parts[1]);
|
||||
// Set AppSubmitTime and MaxAppAttempts to be consumable by the AM.
|
||||
ApplicationId applicationId =
|
||||
application.getAppAttemptId().getApplicationId();
|
||||
environment.put(
|
||||
|
|
|
@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
@ -71,17 +70,17 @@ public class TestApplicationMasterLauncher {
|
|||
launched = true;
|
||||
Map<String, String> env =
|
||||
request.getContainerLaunchContext().getEnvironment();
|
||||
containerIdAtContainerManager =
|
||||
env.get(ApplicationConstants.AM_CONTAINER_ID_ENV);
|
||||
ContainerId containerId =
|
||||
ConverterUtils.toContainerId(containerIdAtContainerManager);
|
||||
request.getContainer().getId();
|
||||
containerIdAtContainerManager = containerId.toString();
|
||||
attemptIdAtContainerManager =
|
||||
containerId.getApplicationAttemptId().toString();
|
||||
nmHostAtContainerManager = env.get(ApplicationConstants.NM_HOST_ENV);
|
||||
nmHostAtContainerManager = request.getContainer().getNodeId().getHost();
|
||||
nmPortAtContainerManager =
|
||||
Integer.parseInt(env.get(ApplicationConstants.NM_PORT_ENV));
|
||||
request.getContainer().getNodeId().getPort();
|
||||
nmHttpPortAtContainerManager =
|
||||
Integer.parseInt(env.get(ApplicationConstants.NM_HTTP_PORT_ENV));
|
||||
Integer.parseInt(request.getContainer().getNodeHttpAddress()
|
||||
.split(":")[1]);
|
||||
submitTimeAtContainerManager =
|
||||
Long.parseLong(env.get(ApplicationConstants.APP_SUBMIT_TIME_ENV));
|
||||
maxAppAttempts =
|
||||
|
|
Loading…
Reference in New Issue