MAPREDUCE-2899. Replace major parts of ApplicationSubmissionContext with a ContainerLaunchContext (Arun Murthy via mahadev) - Merging r1170459 from trunk

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1170460 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mahadev Konar 2011-09-14 07:29:45 +00:00
parent f3ed02a68a
commit 90a5181501
22 changed files with 373 additions and 1080 deletions

View File

@ -263,6 +263,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2676. MR-279: JobHistory Job page needs reformatted. (Robert Evans via MAPREDUCE-2676. MR-279: JobHistory Job page needs reformatted. (Robert Evans via
mahadev) mahadev)
MAPREDUCE-2899. Replace major parts of ApplicationSubmissionContext with a
ContainerLaunchContext (Arun Murthy via mahadev)
OPTIMIZATIONS OPTIMIZATIONS
MAPREDUCE-2026. Make JobTracker.getJobCounters() and MAPREDUCE-2026. Make JobTracker.getJobCounters() and

View File

@ -579,13 +579,12 @@ public abstract class TaskAttemptImpl implements
+ remoteJobConfPath.toUri().toASCIIString()); + remoteJobConfPath.toUri().toASCIIString());
// //////////// End of JobConf setup // //////////// End of JobConf setup
// Setup DistributedCache // Setup DistributedCache
setupDistributedCache(remoteFS, conf, localResources, environment); MRApps.setupDistributedCache(conf, localResources, environment);
// Set local-resources and environment // Set local-resources and environment
container.setLocalResources(localResources); container.setLocalResources(localResources);
container.setEnv(environment); container.setEnvironment(environment);
// Setup up tokens // Setup up tokens
Credentials taskCredentials = new Credentials(); Credentials taskCredentials = new Credentials();
@ -618,7 +617,7 @@ public abstract class TaskAttemptImpl implements
ShuffleHandler.serializeServiceData(jobToken)); ShuffleHandler.serializeServiceData(jobToken));
container.setServiceData(serviceData); container.setServiceData(serviceData);
MRApps.addToClassPath(container.getEnv(), getInitialClasspath()); MRApps.addToClassPath(container.getEnvironment(), getInitialClasspath());
} catch (IOException e) { } catch (IOException e) {
throw new YarnException(e); throw new YarnException(e);
} }
@ -645,7 +644,7 @@ public abstract class TaskAttemptImpl implements
taskAttemptListener.getAddress(), remoteTask, javaHome, taskAttemptListener.getAddress(), remoteTask, javaHome,
workDir.toString(), containerLogDir, childTmpDir, jvmID)); workDir.toString(), containerLogDir, childTmpDir, jvmID));
MapReduceChildJVM.setVMEnv(container.getEnv(), classPaths, MapReduceChildJVM.setVMEnv(container.getEnvironment(), classPaths,
workDir.toString(), containerLogDir, nmLdLibraryPath, remoteTask, workDir.toString(), containerLogDir, nmLdLibraryPath, remoteTask,
localizedApplicationTokensFile); localizedApplicationTokensFile);
@ -656,116 +655,6 @@ public abstract class TaskAttemptImpl implements
return container; return container;
} }
private static long[] parseTimeStamps(String[] strs) {
if (null == strs) {
return null;
}
long[] result = new long[strs.length];
for(int i=0; i < strs.length; ++i) {
result[i] = Long.parseLong(strs[i]);
}
return result;
}
private void setupDistributedCache(FileSystem remoteFS,
Configuration conf,
Map<String, LocalResource> localResources,
Map<String, String> env)
throws IOException {
// Cache archives
parseDistributedCacheArtifacts(remoteFS, localResources, env,
LocalResourceType.ARCHIVE,
DistributedCache.getCacheArchives(conf),
parseTimeStamps(DistributedCache.getArchiveTimestamps(conf)),
getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES),
DistributedCache.getArchiveVisibilities(conf),
DistributedCache.getArchiveClassPaths(conf));
// Cache files
parseDistributedCacheArtifacts(remoteFS,
localResources, env,
LocalResourceType.FILE,
DistributedCache.getCacheFiles(conf),
parseTimeStamps(DistributedCache.getFileTimestamps(conf)),
getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
DistributedCache.getFileVisibilities(conf),
DistributedCache.getFileClassPaths(conf));
}
// TODO - Move this to MR!
// Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[],
// long[], boolean[], Path[], FileType)
private void parseDistributedCacheArtifacts(
FileSystem remoteFS,
Map<String, LocalResource> localResources,
Map<String, String> env,
LocalResourceType type,
URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[],
Path[] pathsToPutOnClasspath) throws IOException {
if (uris != null) {
// Sanity check
if ((uris.length != timestamps.length) || (uris.length != sizes.length) ||
(uris.length != visibilities.length)) {
throw new IllegalArgumentException("Invalid specification for " +
"distributed-cache artifacts of type " + type + " :" +
" #uris=" + uris.length +
" #timestamps=" + timestamps.length +
" #visibilities=" + visibilities.length
);
}
Map<String, Path> classPaths = new HashMap<String, Path>();
if (pathsToPutOnClasspath != null) {
for (Path p : pathsToPutOnClasspath) {
p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory()));
classPaths.put(p.toUri().getPath().toString(), p);
}
}
for (int i = 0; i < uris.length; ++i) {
URI u = uris[i];
Path p = new Path(u);
p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory()));
// Add URI fragment or just the filename
Path name = new Path((null == u.getFragment())
? p.getName()
: u.getFragment());
if (name.isAbsolute()) {
throw new IllegalArgumentException("Resource name must be relative");
}
String linkName = name.toUri().getPath();
localResources.put(
linkName,
BuilderUtils.newLocalResource(
p.toUri(), type,
visibilities[i]
? LocalResourceVisibility.PUBLIC
: LocalResourceVisibility.PRIVATE,
sizes[i], timestamps[i])
);
if (classPaths.containsKey(u.getPath())) {
MRApps.addToClassPath(env, linkName);
}
}
}
}
// TODO - Move this to MR!
private static long[] getFileSizes(Configuration conf, String key) {
String[] strs = conf.getStrings(key);
if (strs == null) {
return null;
}
long[] result = new long[strs.length];
for(int i=0; i < strs.length; ++i) {
result[i] = Long.parseLong(strs[i]);
}
return result;
}
@Override @Override
public ContainerId getAssignedContainerID() { public ContainerId getAssignedContainerID() {
readLock.lock(); readLock.lock();

View File

@ -25,14 +25,20 @@ import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.net.URI;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.v2.MRConstants; import org.apache.hadoop.mapreduce.v2.MRConstants;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@ -42,12 +48,18 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.Apps; import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.BuilderUtils;
/** /**
* Helper class for MR applications * Helper class for MR applications
*/ */
@Private
@Unstable
public class MRApps extends Apps { public class MRApps extends Apps {
public static final String JOB = "job"; public static final String JOB = "job";
public static final String TASK = "task"; public static final String TASK = "task";
@ -232,4 +244,121 @@ public class MRApps extends Apps {
jobId.toString() + Path.SEPARATOR + MRConstants.JOB_CONF_FILE); jobId.toString() + Path.SEPARATOR + MRConstants.JOB_CONF_FILE);
return jobFile.toString(); return jobFile.toString();
} }
private static long[] parseTimeStamps(String[] strs) {
if (null == strs) {
return null;
}
long[] result = new long[strs.length];
for(int i=0; i < strs.length; ++i) {
result[i] = Long.parseLong(strs[i]);
}
return result;
}
public static void setupDistributedCache(
Configuration conf,
Map<String, LocalResource> localResources,
Map<String, String> env)
throws IOException {
// Cache archives
parseDistributedCacheArtifacts(conf, localResources, env,
LocalResourceType.ARCHIVE,
DistributedCache.getCacheArchives(conf),
parseTimeStamps(DistributedCache.getArchiveTimestamps(conf)),
getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES),
DistributedCache.getArchiveVisibilities(conf),
DistributedCache.getArchiveClassPaths(conf));
// Cache files
parseDistributedCacheArtifacts(conf,
localResources, env,
LocalResourceType.FILE,
DistributedCache.getCacheFiles(conf),
parseTimeStamps(DistributedCache.getFileTimestamps(conf)),
getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
DistributedCache.getFileVisibilities(conf),
DistributedCache.getFileClassPaths(conf));
}
// TODO - Move this to MR!
// Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[],
// long[], boolean[], Path[], FileType)
private static void parseDistributedCacheArtifacts(
Configuration conf,
Map<String, LocalResource> localResources,
Map<String, String> env,
LocalResourceType type,
URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[],
Path[] pathsToPutOnClasspath) throws IOException {
if (uris != null) {
// Sanity check
if ((uris.length != timestamps.length) || (uris.length != sizes.length) ||
(uris.length != visibilities.length)) {
throw new IllegalArgumentException("Invalid specification for " +
"distributed-cache artifacts of type " + type + " :" +
" #uris=" + uris.length +
" #timestamps=" + timestamps.length +
" #visibilities=" + visibilities.length
);
}
Map<String, Path> classPaths = new HashMap<String, Path>();
if (pathsToPutOnClasspath != null) {
for (Path p : pathsToPutOnClasspath) {
FileSystem remoteFS = p.getFileSystem(conf);
p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory()));
classPaths.put(p.toUri().getPath().toString(), p);
}
}
for (int i = 0; i < uris.length; ++i) {
URI u = uris[i];
Path p = new Path(u);
FileSystem remoteFS = p.getFileSystem(conf);
p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory()));
// Add URI fragment or just the filename
Path name = new Path((null == u.getFragment())
? p.getName()
: u.getFragment());
if (name.isAbsolute()) {
throw new IllegalArgumentException("Resource name must be relative");
}
String linkName = name.toUri().getPath();
localResources.put(
linkName,
BuilderUtils.newLocalResource(
p.toUri(), type,
visibilities[i]
? LocalResourceVisibility.PUBLIC
: LocalResourceVisibility.PRIVATE,
sizes[i], timestamps[i])
);
if (classPaths.containsKey(u.getPath())) {
MRApps.addToClassPath(env, linkName);
}
}
}
}
// TODO - Move this to MR!
private static long[] getFileSizes(Configuration conf, String key) {
String[] strs = conf.getStrings(key);
if (strs == null) {
return null;
}
long[] result = new long[strs.length];
for(int i=0; i < strs.length; ++i) {
result[i] = Long.parseLong(strs[i]);
}
return result;
}
} }

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.mapred; package org.apache.hadoop.mapred;
import java.io.IOException; import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
@ -33,7 +32,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
@ -55,7 +53,6 @@ import org.apache.hadoop.mapreduce.TaskReport;
import org.apache.hadoop.mapreduce.TaskTrackerInfo; import org.apache.hadoop.mapreduce.TaskTrackerInfo;
import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol; import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.MRConstants; import org.apache.hadoop.mapreduce.v2.MRConstants;
@ -72,6 +69,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationState; import org.apache.hadoop.yarn.api.records.ApplicationState;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@ -237,7 +235,6 @@ public class YARNRunner implements ClientProtocol {
// Construct necessary information to start the MR AM // Construct necessary information to start the MR AM
ApplicationSubmissionContext appContext = ApplicationSubmissionContext appContext =
createApplicationSubmissionContext(conf, jobSubmitDir, ts); createApplicationSubmissionContext(conf, jobSubmitDir, ts);
setupDistributedCache(conf, appContext);
// XXX Remove // XXX Remove
in.close(); in.close();
@ -273,16 +270,18 @@ public class YARNRunner implements ClientProtocol {
public ApplicationSubmissionContext createApplicationSubmissionContext( public ApplicationSubmissionContext createApplicationSubmissionContext(
Configuration jobConf, Configuration jobConf,
String jobSubmitDir, Credentials ts) throws IOException { String jobSubmitDir, Credentials ts) throws IOException {
ApplicationSubmissionContext appContext =
recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
ApplicationId applicationId = resMgrDelegate.getApplicationId(); ApplicationId applicationId = resMgrDelegate.getApplicationId();
appContext.setApplicationId(applicationId);
// Setup resource requirements
Resource capability = recordFactory.newRecordInstance(Resource.class); Resource capability = recordFactory.newRecordInstance(Resource.class);
capability.setMemory(conf.getInt(MRJobConfig.MR_AM_VMEM_MB, capability.setMemory(conf.getInt(MRJobConfig.MR_AM_VMEM_MB,
MRJobConfig.DEFAULT_MR_AM_VMEM_MB)); MRJobConfig.DEFAULT_MR_AM_VMEM_MB));
LOG.info("AppMaster capability = " + capability); LOG.info("AppMaster capability = " + capability);
appContext.setMasterCapability(capability);
// Setup LocalResources
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
Path jobConfPath = new Path(jobSubmitDir, MRConstants.JOB_CONF_FILE); Path jobConfPath = new Path(jobSubmitDir, MRConstants.JOB_CONF_FILE);
URL yarnUrlForJobSubmitDir = ConverterUtils URL yarnUrlForJobSubmitDir = ConverterUtils
@ -292,14 +291,11 @@ public class YARNRunner implements ClientProtocol {
LOG.debug("Creating setup context, jobSubmitDir url is " LOG.debug("Creating setup context, jobSubmitDir url is "
+ yarnUrlForJobSubmitDir); + yarnUrlForJobSubmitDir);
appContext.setResource(MRConstants.JOB_SUBMIT_DIR, localResources.put(MRConstants.JOB_CONF_FILE,
yarnUrlForJobSubmitDir);
appContext.setResourceTodo(MRConstants.JOB_CONF_FILE,
createApplicationResource(defaultFileContext, createApplicationResource(defaultFileContext,
jobConfPath)); jobConfPath));
if (jobConf.get(MRJobConfig.JAR) != null) { if (jobConf.get(MRJobConfig.JAR) != null) {
appContext.setResourceTodo(MRConstants.JOB_JAR, localResources.put(MRConstants.JOB_JAR,
createApplicationResource(defaultFileContext, createApplicationResource(defaultFileContext,
new Path(jobSubmitDir, MRConstants.JOB_JAR))); new Path(jobSubmitDir, MRConstants.JOB_JAR)));
} else { } else {
@ -312,30 +308,21 @@ public class YARNRunner implements ClientProtocol {
// TODO gross hack // TODO gross hack
for (String s : new String[] { "job.split", "job.splitmetainfo", for (String s : new String[] { "job.split", "job.splitmetainfo",
MRConstants.APPLICATION_TOKENS_FILE }) { MRConstants.APPLICATION_TOKENS_FILE }) {
appContext.setResourceTodo( localResources.put(
MRConstants.JOB_SUBMIT_DIR + "/" + s, MRConstants.JOB_SUBMIT_DIR + "/" + s,
createApplicationResource(defaultFileContext, new Path(jobSubmitDir, s))); createApplicationResource(defaultFileContext,
} new Path(jobSubmitDir, s)));
// TODO: Only if security is on.
List<String> fsTokens = new ArrayList<String>();
for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
fsTokens.add(token.encodeToUrlString());
} }
// TODO - Remove this! // Setup security tokens
appContext.addAllFsTokens(fsTokens); ByteBuffer securityTokens = null;
DataOutputBuffer dob = new DataOutputBuffer(); if (UserGroupInformation.isSecurityEnabled()) {
ts.writeTokenStorageToStream(dob); DataOutputBuffer dob = new DataOutputBuffer();
appContext.setFsTokensTodo(ByteBuffer.wrap(dob.getData(), 0, dob.getLength())); ts.writeTokenStorageToStream(dob);
securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
}
// Add queue information // Setup the command to run the AM
appContext.setQueue(jobConf.get(JobContext.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME));
// Add job name
appContext.setApplicationName(jobConf.get(JobContext.JOB_NAME, "N/A"));
// Add the command line
String javaHome = "$JAVA_HOME"; String javaHome = "$JAVA_HOME";
Vector<CharSequence> vargs = new Vector<CharSequence>(8); Vector<CharSequence> vargs = new Vector<CharSequence>(8);
vargs.add(javaHome + "/bin/java"); vargs.add(javaHome + "/bin/java");
@ -346,13 +333,6 @@ public class YARNRunner implements ClientProtocol {
vargs.add(conf.get(MRJobConfig.MR_AM_COMMAND_OPTS, vargs.add(conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS)); MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS));
// Add { job jar, MR app jar } to classpath.
Map<String, String> environment = new HashMap<String, String>();
MRApps.setInitialClasspath(environment);
MRApps.addToClassPath(environment, MRConstants.JOB_JAR);
MRApps.addToClassPath(environment,
MRConstants.YARN_MAPREDUCE_APP_JAR_PATH);
appContext.addAllEnvironment(environment);
vargs.add("org.apache.hadoop.mapreduce.v2.app.MRAppMaster"); vargs.add("org.apache.hadoop.mapreduce.v2.app.MRAppMaster");
vargs.add(String.valueOf(applicationId.getClusterTimestamp())); vargs.add(String.valueOf(applicationId.getClusterTimestamp()));
vargs.add(String.valueOf(applicationId.getId())); vargs.add(String.valueOf(applicationId.getId()));
@ -370,140 +350,43 @@ public class YARNRunner implements ClientProtocol {
LOG.info("Command to launch container for ApplicationMaster is : " LOG.info("Command to launch container for ApplicationMaster is : "
+ mergedCommand); + mergedCommand);
// Setup the environment - Add { job jar, MR app jar } to classpath.
Map<String, String> environment = new HashMap<String, String>();
MRApps.setInitialClasspath(environment);
MRApps.addToClassPath(environment, MRConstants.JOB_JAR);
MRApps.addToClassPath(environment,
MRConstants.YARN_MAPREDUCE_APP_JAR_PATH);
// Parse distributed cache
MRApps.setupDistributedCache(jobConf, localResources, environment);
// Setup ContainerLaunchContext for AM container
ContainerLaunchContext amContainer =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
amContainer.setResource(capability); // Resource (mem) required
amContainer.setLocalResources(localResources); // Local resources
amContainer.setEnvironment(environment); // Environment
amContainer.setCommands(vargsFinal); // Command for AM
amContainer.setContainerTokens(securityTokens); // Security tokens
// Set up the ApplicationSubmissionContext
ApplicationSubmissionContext appContext =
recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
appContext.setApplicationId(applicationId); // ApplicationId
appContext.setUser( // User name
UserGroupInformation.getCurrentUser().getShortUserName());
appContext.setQueue( // Queue name
jobConf.get(JobContext.QUEUE_NAME,
YarnConfiguration.DEFAULT_QUEUE_NAME));
appContext.setApplicationName( // Job name
jobConf.get(JobContext.JOB_NAME,
YarnConfiguration.DEFAULT_APPLICATION_NAME));
appContext.setAMContainerSpec(amContainer); // AM Container
appContext.addAllCommands(vargsFinal);
// TODO: RM should get this from RPC.
appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
return appContext; return appContext;
} }
/**
* * TODO: Copied for now from TaskAttemptImpl.java ... fixme
* @param strs
* @return
*/
private static long[] parseTimeStamps(String[] strs) {
if (null == strs) {
return null;
}
long[] result = new long[strs.length];
for(int i=0; i < strs.length; ++i) {
result[i] = Long.parseLong(strs[i]);
}
return result;
}
/**
* TODO: Copied for now from TaskAttemptImpl.java ... fixme
*
* TODO: This is currently needed in YarnRunner as user code like setupJob,
* cleanupJob may need access to dist-cache. Once we separate distcache for
* maps, reduces, setup etc, this can include only a subset of artificats.
* This is also needed for uberAM case where we run everything inside AM.
*/
private void setupDistributedCache(Configuration conf,
ApplicationSubmissionContext container) throws IOException {
// Cache archives
parseDistributedCacheArtifacts(conf, container, LocalResourceType.ARCHIVE,
DistributedCache.getCacheArchives(conf),
parseTimeStamps(DistributedCache.getArchiveTimestamps(conf)),
getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES),
DistributedCache.getArchiveVisibilities(conf),
DistributedCache.getArchiveClassPaths(conf));
// Cache files
parseDistributedCacheArtifacts(conf, container, LocalResourceType.FILE,
DistributedCache.getCacheFiles(conf),
parseTimeStamps(DistributedCache.getFileTimestamps(conf)),
getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
DistributedCache.getFileVisibilities(conf),
DistributedCache.getFileClassPaths(conf));
}
// TODO - Move this to MR!
// Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[], long[], boolean[], Path[], FileType)
private void parseDistributedCacheArtifacts(Configuration conf,
ApplicationSubmissionContext container, LocalResourceType type,
URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[],
Path[] pathsToPutOnClasspath) throws IOException {
if (uris != null) {
// Sanity check
if ((uris.length != timestamps.length) || (uris.length != sizes.length) ||
(uris.length != visibilities.length)) {
throw new IllegalArgumentException("Invalid specification for " +
"distributed-cache artifacts of type " + type + " :" +
" #uris=" + uris.length +
" #timestamps=" + timestamps.length +
" #visibilities=" + visibilities.length
);
}
Map<String, Path> classPaths = new HashMap<String, Path>();
if (pathsToPutOnClasspath != null) {
for (Path p : pathsToPutOnClasspath) {
FileSystem fs = p.getFileSystem(conf);
p = p.makeQualified(fs.getUri(), fs.getWorkingDirectory());
classPaths.put(p.toUri().getPath().toString(), p);
}
}
for (int i = 0; i < uris.length; ++i) {
URI u = uris[i];
Path p = new Path(u);
FileSystem fs = p.getFileSystem(conf);
p = fs.resolvePath(
p.makeQualified(fs.getUri(), fs.getWorkingDirectory()));
// Add URI fragment or just the filename
Path name = new Path((null == u.getFragment())
? p.getName()
: u.getFragment());
if (name.isAbsolute()) {
throw new IllegalArgumentException("Resource name must be relative");
}
String linkName = name.toUri().getPath();
container.setResourceTodo(
linkName,
createLocalResource(
p.toUri(), type,
visibilities[i]
? LocalResourceVisibility.PUBLIC
: LocalResourceVisibility.PRIVATE,
sizes[i], timestamps[i])
);
if (classPaths.containsKey(u.getPath())) {
Map<String, String> environment = container.getAllEnvironment();
MRApps.addToClassPath(environment, linkName);
}
}
}
}
// TODO - Move this to MR!
private static long[] getFileSizes(Configuration conf, String key) {
String[] strs = conf.getStrings(key);
if (strs == null) {
return null;
}
long[] result = new long[strs.length];
for(int i=0; i < strs.length; ++i) {
result[i] = Long.parseLong(strs[i]);
}
return result;
}
private LocalResource createLocalResource(URI uri,
LocalResourceType type, LocalResourceVisibility visibility,
long size, long timestamp) throws IOException {
LocalResource resource = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(LocalResource.class);
resource.setResource(ConverterUtils.getYarnUrlFromURI(uri));
resource.setType(type);
resource.setVisibility(visibility);
resource.setSize(size);
resource.setTimestamp(timestamp);
return resource;
}
@Override @Override
public void setJobPriority(JobID arg0, String arg1) throws IOException, public void setJobPriority(JobID arg0, String arg1) throws IOException,
InterruptedException { InterruptedException {

View File

@ -18,14 +18,8 @@
package org.apache.hadoop.yarn.api.records; package org.apache.hadoop.yarn.api.records;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ClientRMProtocol; import org.apache.hadoop.yarn.api.ClientRMProtocol;
/** /**
@ -36,26 +30,17 @@ import org.apache.hadoop.yarn.api.ClientRMProtocol;
* <p>It includes details such as: * <p>It includes details such as:
* <ul> * <ul>
* <li>{@link ApplicationId} of the application.</li> * <li>{@link ApplicationId} of the application.</li>
* <li>
* {@link Resource} necessary to run the <code>ApplicationMaster</code>.
* </li>
* <li>Application user.</li> * <li>Application user.</li>
* <li>Application name.</li> * <li>Application name.</li>
* <li>{@link Priority} of the application.</li> * <li>{@link Priority} of the application.</li>
* <li>Security tokens (if security is enabled).</li>
* <li> * <li>
* {@link LocalResource} necessary for running the * {@link ContainerLaunchContext} of the container in which the
* <code>ApplicationMaster</code> container such * <code>ApplicationMaster</code> is executed.
* as binaries, jar, shared-objects, side-files etc.
* </li> * </li>
* <li>
* Environment variables for the launched <code>ApplicationMaster</code>
* process.
* </li>
* <li>Command to launch the <code>ApplicationMaster</code>.</li>
* </ul> * </ul>
* </p> * </p>
* *
* @see ContainerLaunchContext
* @see ClientRMProtocol#submitApplication(org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest) * @see ClientRMProtocol#submitApplication(org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest)
*/ */
@Public @Public
@ -143,198 +128,25 @@ public interface ApplicationSubmissionContext {
public void setUser(String user); public void setUser(String user);
/** /**
* Get the <code>Resource</code> required to run the * Get the <code>ContainerLaunchContext</code> to describe the
* <code>ApplicationMaster</code>. * <code>Container</code> with which the <code>ApplicationMaster</code> is
* @return <code>Resource</code> required to run the * launched.
* <code>ApplicationMaster</code> * @return <code>ContainerLaunchContext</code> for the
* <code>ApplicationMaster</code> container
*/ */
@Public @Public
@Stable @Stable
public Resource getMasterCapability(); public ContainerLaunchContext getAMContainerSpec();
/** /**
* Set <code>Resource</code> required to run the * Set the <code>ContainerLaunchContext</code> to describe the
* <code>ApplicationMaster</code>. * <code>Container</code> with which the <code>ApplicationMaster</code> is
* @param masterCapability <code>Resource</code> required to run the * launched.
* <code>ApplicationMaster</code> * @param amContainer <code>ContainerLaunchContext</code> for the
* <code>ApplicationMaster</code> container
*/ */
@Public @Public
@Stable @Stable
public void setMasterCapability(Resource masterCapability); public void setAMContainerSpec(ContainerLaunchContext amContainer);
@Private
@Unstable
public Map<String, URL> getAllResources();
@Private
@Unstable
public URL getResource(String key);
@Private
@Unstable
public void addAllResources(Map<String, URL> resources);
@Private
@Unstable
public void setResource(String key, URL url);
@Private
@Unstable
public void removeResource(String key);
@Private
@Unstable
public void clearResources();
/**
* Get all the <code>LocalResource</code> required to run the
* <code>ApplicationMaster</code>.
* @return <code>LocalResource</code> required to run the
* <code>ApplicationMaster</code>
*/
@Public
@Stable
public Map<String, LocalResource> getAllResourcesTodo();
@Private
@Unstable
public LocalResource getResourceTodo(String key);
/**
* Add all the <code>LocalResource</code> required to run the
* <code>ApplicationMaster</code>.
* @param resources all <code>LocalResource</code> required to run the
* <code>ApplicationMaster</code>
*/
@Public
@Stable
public void addAllResourcesTodo(Map<String, LocalResource> resources);
@Private
@Unstable
public void setResourceTodo(String key, LocalResource localResource);
@Private
@Unstable
public void removeResourceTodo(String key);
@Private
@Unstable
public void clearResourcesTodo();
@Private
@Unstable
public List<String> getFsTokenList();
@Private
@Unstable
public String getFsToken(int index);
@Private
@Unstable
public int getFsTokenCount();
@Private
@Unstable
public void addAllFsTokens(List<String> fsTokens);
@Private
@Unstable
public void addFsToken(String fsToken);
@Private
@Unstable
public void removeFsToken(int index);
@Private
@Unstable
public void clearFsTokens();
/**
* Get <em>file-system tokens</em> for the <code>ApplicationMaster</code>.
* @return file-system tokens for the <code>ApplicationMaster</code>
*/
@Public
@Stable
public ByteBuffer getFsTokensTodo();
/**
* Set <em>file-system tokens</em> for the <code>ApplicationMaster</code>.
* @param fsTokens file-system tokens for the <code>ApplicationMaster</code>
*/
@Public
@Stable
public void setFsTokensTodo(ByteBuffer fsTokens);
/**
* Get the <em>environment variables</em> for the
* <code>ApplicationMaster</code>.
* @return environment variables for the <code>ApplicationMaster</code>
*/
@Public
@Stable
public Map<String, String> getAllEnvironment();
@Private
@Unstable
public String getEnvironment(String key);
/**
* Add all of the <em>environment variables</em> for the
* <code>ApplicationMaster</code>.
* @param environment environment variables for the
* <code>ApplicationMaster</code>
*/
@Public
@Stable
public void addAllEnvironment(Map<String, String> environment);
@Private
@Unstable
public void setEnvironment(String key, String env);
@Private
@Unstable
public void removeEnvironment(String key);
@Private
@Unstable
public void clearEnvironment();
/**
* Get the <em>commands</em> to launch the <code>ApplicationMaster</code>.
* @return commands to launch the <code>ApplicationMaster</code>
*/
@Public
@Stable
public List<String> getCommandList();
@Private
@Unstable
public String getCommand(int index);
@Private
@Unstable
public int getCommandCount();
/**
* Add all of the <em>commands</em> to launch the
* <code>ApplicationMaster</code>.
* @param commands commands to launch the <code>ApplicationMaster</code>
*/
@Public
@Stable
public void addAllCommands(List<String> commands);
@Private
@Unstable
public void addCommand(String command);
@Private
@Unstable
public void removeCommand(int index);
@Private
@Unstable
public void clearCommands();
} }

View File

@ -156,7 +156,7 @@ public interface ContainerLaunchContext {
*/ */
@Public @Public
@Stable @Stable
Map<String, String> getEnv(); Map<String, String> getEnvironment();
/** /**
* Add <em>environment variables</em> for the container. * Add <em>environment variables</em> for the container.
@ -164,7 +164,7 @@ public interface ContainerLaunchContext {
*/ */
@Public @Public
@Stable @Stable
void setEnv(Map<String, String> environment); void setEnvironment(Map<String, String> environment);
/** /**
* Get the list of <em>commands</em> for launching the container. * Get the list of <em>commands</em> for launching the container.

View File

@ -18,56 +18,35 @@
package org.apache.hadoop.yarn.api.records.impl.pb; package org.apache.hadoop.yarn.api.records.impl.pb;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ProtoBase; import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto; import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StringLocalResourceMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StringStringMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StringURLMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.URLProto;
public class ApplicationSubmissionContextPBImpl extends ProtoBase<ApplicationSubmissionContextProto> implements ApplicationSubmissionContext { public class ApplicationSubmissionContextPBImpl
ApplicationSubmissionContextProto proto = ApplicationSubmissionContextProto.getDefaultInstance(); extends ProtoBase<ApplicationSubmissionContextProto>
implements ApplicationSubmissionContext {
ApplicationSubmissionContextProto proto =
ApplicationSubmissionContextProto.getDefaultInstance();
ApplicationSubmissionContextProto.Builder builder = null; ApplicationSubmissionContextProto.Builder builder = null;
boolean viaProto = false; boolean viaProto = false;
private ApplicationId applicationId = null; private ApplicationId applicationId = null;
private Resource masterCapability = null;
private Map<String, URL> resources = null;
private Map<String, LocalResource> resourcesTodo = null;
private List<String> fsTokenList = null;
private ByteBuffer fsTokenTodo = null;
private Map<String, String> environment = null;
private List<String> commandList = null;
private Priority priority = null; private Priority priority = null;
private ContainerLaunchContext amContainer = null;
public ApplicationSubmissionContextPBImpl() { public ApplicationSubmissionContextPBImpl() {
builder = ApplicationSubmissionContextProto.newBuilder(); builder = ApplicationSubmissionContextProto.newBuilder();
} }
public ApplicationSubmissionContextPBImpl(ApplicationSubmissionContextProto proto) { public ApplicationSubmissionContextPBImpl(
ApplicationSubmissionContextProto proto) {
this.proto = proto; this.proto = proto;
viaProto = true; viaProto = true;
} }
@ -83,30 +62,12 @@ public class ApplicationSubmissionContextPBImpl extends ProtoBase<ApplicationSub
if (this.applicationId != null) { if (this.applicationId != null) {
builder.setApplicationId(convertToProtoFormat(this.applicationId)); builder.setApplicationId(convertToProtoFormat(this.applicationId));
} }
if (this.masterCapability != null) {
builder.setMasterCapability(convertToProtoFormat(this.masterCapability));
}
if (this.resources != null) {
addResourcesToProto();
}
if (this.resourcesTodo != null) {
addResourcesTodoToProto();
}
if (this.fsTokenList != null) {
addFsTokenListToProto();
}
if (this.fsTokenTodo != null) {
builder.setFsTokensTodo(convertToProtoFormat(this.fsTokenTodo));
}
if (this.environment != null) {
addEnvironmentToProto();
}
if (this.commandList != null) {
addCommandsToProto();
}
if (this.priority != null) { if (this.priority != null) {
builder.setPriority(convertToProtoFormat(this.priority)); builder.setPriority(convertToProtoFormat(this.priority));
} }
if (this.amContainer != null) {
builder.setAmContainerSpec(convertToProtoFormat(this.amContainer));
}
} }
private void mergeLocalToProto() { private void mergeLocalToProto() {
@ -145,6 +106,7 @@ public class ApplicationSubmissionContextPBImpl extends ProtoBase<ApplicationSub
builder.clearPriority(); builder.clearPriority();
this.priority = priority; this.priority = priority;
} }
@Override @Override
public ApplicationId getApplicationId() { public ApplicationId getApplicationId() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
@ -165,6 +127,7 @@ public class ApplicationSubmissionContextPBImpl extends ProtoBase<ApplicationSub
builder.clearApplicationId(); builder.clearApplicationId();
this.applicationId = applicationId; this.applicationId = applicationId;
} }
@Override @Override
public String getApplicationName() { public String getApplicationName() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
@ -183,403 +146,7 @@ public class ApplicationSubmissionContextPBImpl extends ProtoBase<ApplicationSub
} }
builder.setApplicationName((applicationName)); builder.setApplicationName((applicationName));
} }
@Override
public Resource getMasterCapability() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
if (this.masterCapability != null) {
return masterCapability;
} // Else via proto
if (!p.hasMasterCapability()) {
return null;
}
masterCapability = convertFromProtoFormat(p.getMasterCapability());
return this.masterCapability;
}
@Override
public void setMasterCapability(Resource masterCapability) {
maybeInitBuilder();
if (masterCapability == null)
builder.clearMasterCapability();
this.masterCapability = masterCapability;
}
@Override
public Map<String, URL> getAllResources() {
initResources();
return this.resources;
}
@Override
public URL getResource(String key) {
initResources();
return this.resources.get(key);
}
private void initResources() {
if (this.resources != null) {
return;
}
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
List<StringURLMapProto> mapAsList = p.getResourcesList();
this.resources = new HashMap<String, URL>();
for (StringURLMapProto c : mapAsList) {
this.resources.put(c.getKey(), convertFromProtoFormat(c.getValue()));
}
}
@Override
public void addAllResources(final Map<String, URL> resources) {
if (resources == null)
return;
initResources();
this.resources.putAll(resources);
}
private void addResourcesToProto() {
maybeInitBuilder();
builder.clearResources();
if (this.resources == null)
return;
Iterable<StringURLMapProto> iterable = new Iterable<StringURLMapProto>() {
@Override
public Iterator<StringURLMapProto> iterator() {
return new Iterator<StringURLMapProto>() {
Iterator<String> keyIter = resources.keySet().iterator();
@Override
public void remove() {
throw new UnsupportedOperationException();
}
@Override
public StringURLMapProto next() {
String key = keyIter.next();
return StringURLMapProto.newBuilder().setKey(key).setValue(convertToProtoFormat(resources.get(key))).build();
}
@Override
public boolean hasNext() {
return keyIter.hasNext();
}
};
}
};
builder.addAllResources(iterable);
}
@Override
public void setResource(String key, URL val) {
initResources();
this.resources.put(key, val);
}
@Override
public void removeResource(String key) {
initResources();
this.resources.remove(key);
}
@Override
public void clearResources() {
initResources();
this.resources.clear();
}
@Override
public Map<String, LocalResource> getAllResourcesTodo() {
initResourcesTodo();
return this.resourcesTodo;
}
@Override
public LocalResource getResourceTodo(String key) {
initResourcesTodo();
return this.resourcesTodo.get(key);
}
private void initResourcesTodo() {
if (this.resourcesTodo != null) {
return;
}
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
List<StringLocalResourceMapProto> mapAsList = p.getResourcesTodoList();
this.resourcesTodo = new HashMap<String, LocalResource>();
for (StringLocalResourceMapProto c : mapAsList) {
this.resourcesTodo.put(c.getKey(), convertFromProtoFormat(c.getValue()));
}
}
@Override
public void addAllResourcesTodo(final Map<String, LocalResource> resourcesTodo) {
if (resourcesTodo == null)
return;
initResourcesTodo();
this.resourcesTodo.putAll(resourcesTodo);
}
private void addResourcesTodoToProto() {
maybeInitBuilder();
builder.clearResourcesTodo();
if (resourcesTodo == null)
return;
Iterable<StringLocalResourceMapProto> iterable = new Iterable<StringLocalResourceMapProto>() {
@Override
public Iterator<StringLocalResourceMapProto> iterator() {
return new Iterator<StringLocalResourceMapProto>() {
Iterator<String> keyIter = resourcesTodo.keySet().iterator();
@Override
public void remove() {
throw new UnsupportedOperationException();
}
@Override
public StringLocalResourceMapProto next() {
String key = keyIter.next();
return StringLocalResourceMapProto.newBuilder().setKey(key).setValue(convertToProtoFormat(resourcesTodo.get(key))).build();
}
@Override
public boolean hasNext() {
return keyIter.hasNext();
}
};
}
};
builder.addAllResourcesTodo(iterable);
}
@Override
public void setResourceTodo(String key, LocalResource val) {
initResourcesTodo();
this.resourcesTodo.put(key, val);
}
@Override
public void removeResourceTodo(String key) {
initResourcesTodo();
this.resourcesTodo.remove(key);
}
@Override
public void clearResourcesTodo() {
initResourcesTodo();
this.resourcesTodo.clear();
}
@Override
public List<String> getFsTokenList() {
initFsTokenList();
return this.fsTokenList;
}
@Override
public String getFsToken(int index) {
initFsTokenList();
return this.fsTokenList.get(index);
}
@Override
public int getFsTokenCount() {
initFsTokenList();
return this.fsTokenList.size();
}
private void initFsTokenList() {
if (this.fsTokenList != null) {
return;
}
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
List<String> list = p.getFsTokensList();
this.fsTokenList = new ArrayList<String>();
for (String c : list) {
this.fsTokenList.add(c);
}
}
@Override
public void addAllFsTokens(final List<String> fsTokens) {
if (fsTokens == null)
return;
initFsTokenList();
this.fsTokenList.addAll(fsTokens);
}
private void addFsTokenListToProto() {
maybeInitBuilder();
builder.clearFsTokens();
builder.addAllFsTokens(this.fsTokenList);
}
@Override
public void addFsToken(String fsTokens) {
initFsTokenList();
this.fsTokenList.add(fsTokens);
}
@Override
public void removeFsToken(int index) {
initFsTokenList();
this.fsTokenList.remove(index);
}
@Override
public void clearFsTokens() {
initFsTokenList();
this.fsTokenList.clear();
}
@Override
public ByteBuffer getFsTokensTodo() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
if (this.fsTokenTodo != null) {
return this.fsTokenTodo;
}
if (!p.hasFsTokensTodo()) {
return null;
}
this.fsTokenTodo = convertFromProtoFormat(p.getFsTokensTodo());
return this.fsTokenTodo;
}
@Override
public void setFsTokensTodo(ByteBuffer fsTokensTodo) {
maybeInitBuilder();
if (fsTokensTodo == null)
builder.clearFsTokensTodo();
this.fsTokenTodo = fsTokensTodo;
}
@Override
public Map<String, String> getAllEnvironment() {
initEnvironment();
return this.environment;
}
@Override
public String getEnvironment(String key) {
initEnvironment();
return this.environment.get(key);
}
private void initEnvironment() {
if (this.environment != null) {
return;
}
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
List<StringStringMapProto> mapAsList = p.getEnvironmentList();
this.environment = new HashMap<String, String>();
for (StringStringMapProto c : mapAsList) {
this.environment.put(c.getKey(), c.getValue());
}
}
@Override
public void addAllEnvironment(Map<String, String> environment) {
if (environment == null)
return;
initEnvironment();
this.environment.putAll(environment);
}
private void addEnvironmentToProto() {
maybeInitBuilder();
builder.clearEnvironment();
if (environment == null)
return;
Iterable<StringStringMapProto> iterable = new Iterable<StringStringMapProto>() {
@Override
public Iterator<StringStringMapProto> iterator() {
return new Iterator<StringStringMapProto>() {
Iterator<String> keyIter = environment.keySet().iterator();
@Override
public void remove() {
throw new UnsupportedOperationException();
}
@Override
public StringStringMapProto next() {
String key = keyIter.next();
return StringStringMapProto.newBuilder().setKey(key).setValue((environment.get(key))).build();
}
@Override
public boolean hasNext() {
return keyIter.hasNext();
}
};
}
};
builder.addAllEnvironment(iterable);
}
@Override
public void setEnvironment(String key, String val) {
initEnvironment();
this.environment.put(key, val);
}
@Override
public void removeEnvironment(String key) {
initEnvironment();
this.environment.remove(key);
}
@Override
public void clearEnvironment() {
initEnvironment();
this.environment.clear();
}
@Override
public List<String> getCommandList() {
initCommandList();
return this.commandList;
}
@Override
public String getCommand(int index) {
initCommandList();
return this.commandList.get(index);
}
@Override
public int getCommandCount() {
initCommandList();
return this.commandList.size();
}
private void initCommandList() {
if (this.commandList != null) {
return;
}
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
List<String> list = p.getCommandList();
this.commandList = new ArrayList<String>();
for (String c : list) {
this.commandList.add(c);
}
}
@Override
public void addAllCommands(final List<String> command) {
if (command == null)
return;
initCommandList();
this.commandList.addAll(command);
}
private void addCommandsToProto() {
maybeInitBuilder();
builder.clearCommand();
if (this.commandList == null)
return;
builder.addAllCommand(this.commandList);
}
@Override
public void addCommand(String command) {
initCommandList();
this.commandList.add(command);
}
@Override
public void removeCommand(int index) {
initCommandList();
this.commandList.remove(index);
}
@Override
public void clearCommands() {
initCommandList();
this.commandList.clear();
}
@Override @Override
public String getQueue() { public String getQueue() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
@ -598,6 +165,7 @@ public class ApplicationSubmissionContextPBImpl extends ProtoBase<ApplicationSub
} }
builder.setQueue((queue)); builder.setQueue((queue));
} }
@Override @Override
public String getUser() { public String getUser() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
@ -617,6 +185,28 @@ public class ApplicationSubmissionContextPBImpl extends ProtoBase<ApplicationSub
builder.setUser((user)); builder.setUser((user));
} }
@Override
public ContainerLaunchContext getAMContainerSpec() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
if (this.amContainer != null) {
return amContainer;
} // Else via proto
if (!p.hasAmContainerSpec()) {
return null;
}
amContainer = convertFromProtoFormat(p.getAmContainerSpec());
return amContainer;
}
@Override
public void setAMContainerSpec(ContainerLaunchContext amContainer) {
maybeInitBuilder();
if (amContainer == null) {
builder.clearAmContainerSpec();
}
this.amContainer = amContainer;
}
private PriorityPBImpl convertFromProtoFormat(PriorityProto p) { private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
return new PriorityPBImpl(p); return new PriorityPBImpl(p);
} }
@ -633,28 +223,12 @@ public class ApplicationSubmissionContextPBImpl extends ProtoBase<ApplicationSub
return ((ApplicationIdPBImpl)t).getProto(); return ((ApplicationIdPBImpl)t).getProto();
} }
private ResourcePBImpl convertFromProtoFormat(ResourceProto p) { private ContainerLaunchContextPBImpl convertFromProtoFormat(
return new ResourcePBImpl(p); ContainerLaunchContextProto p) {
return new ContainerLaunchContextPBImpl(p);
} }
private ResourceProto convertToProtoFormat(Resource t) { private ContainerLaunchContextProto convertToProtoFormat(ContainerLaunchContext t) {
return ((ResourcePBImpl)t).getProto(); return ((ContainerLaunchContextPBImpl)t).getProto();
} }
private URLPBImpl convertFromProtoFormat(URLProto p) {
return new URLPBImpl(p);
}
private URLProto convertToProtoFormat(URL t) {
return ((URLPBImpl)t).getProto();
}
private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto p) {
return new LocalResourcePBImpl(p);
}
private LocalResourceProto convertToProtoFormat(LocalResource t) {
return ((LocalResourcePBImpl)t).getProto();
}
} }

View File

@ -39,8 +39,6 @@ import org.apache.hadoop.yarn.proto.YarnProtos.StringBytesMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StringLocalResourceMapProto; import org.apache.hadoop.yarn.proto.YarnProtos.StringLocalResourceMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StringStringMapProto; import org.apache.hadoop.yarn.proto.YarnProtos.StringStringMapProto;
public class ContainerLaunchContextPBImpl public class ContainerLaunchContextPBImpl
extends ProtoBase<ContainerLaunchContextProto> extends ProtoBase<ContainerLaunchContextProto>
implements ContainerLaunchContext { implements ContainerLaunchContext {
@ -54,10 +52,9 @@ implements ContainerLaunchContext {
private Map<String, LocalResource> localResources = null; private Map<String, LocalResource> localResources = null;
private ByteBuffer containerTokens = null; private ByteBuffer containerTokens = null;
private Map<String, ByteBuffer> serviceData = null; private Map<String, ByteBuffer> serviceData = null;
private Map<String, String> env = null; private Map<String, String> environment = null;
private List<String> commands = null; private List<String> commands = null;
public ContainerLaunchContextPBImpl() { public ContainerLaunchContextPBImpl() {
builder = ContainerLaunchContextProto.newBuilder(); builder = ContainerLaunchContextProto.newBuilder();
} }
@ -94,7 +91,7 @@ implements ContainerLaunchContext {
if (this.serviceData != null) { if (this.serviceData != null) {
addServiceDataToProto(); addServiceDataToProto();
} }
if (this.env != null) { if (this.environment != null) {
addEnvToProto(); addEnvToProto();
} }
if (this.commands != null) { if (this.commands != null) {
@ -364,37 +361,37 @@ implements ContainerLaunchContext {
} }
@Override @Override
public Map<String, String> getEnv() { public Map<String, String> getEnvironment() {
initEnv(); initEnv();
return this.env; return this.environment;
} }
private void initEnv() { private void initEnv() {
if (this.env != null) { if (this.environment != null) {
return; return;
} }
ContainerLaunchContextProtoOrBuilder p = viaProto ? proto : builder; ContainerLaunchContextProtoOrBuilder p = viaProto ? proto : builder;
List<StringStringMapProto> list = p.getEnvList(); List<StringStringMapProto> list = p.getEnvironmentList();
this.env = new HashMap<String, String>(); this.environment = new HashMap<String, String>();
for (StringStringMapProto c : list) { for (StringStringMapProto c : list) {
this.env.put(c.getKey(), c.getValue()); this.environment.put(c.getKey(), c.getValue());
} }
} }
@Override @Override
public void setEnv(final Map<String, String> env) { public void setEnvironment(final Map<String, String> env) {
if (env == null) if (env == null)
return; return;
initEnv(); initEnv();
this.env.clear(); this.environment.clear();
this.env.putAll(env); this.environment.putAll(env);
} }
private void addEnvToProto() { private void addEnvToProto() {
maybeInitBuilder(); maybeInitBuilder();
builder.clearEnv(); builder.clearEnvironment();
if (env == null) if (environment == null)
return; return;
Iterable<StringStringMapProto> iterable = Iterable<StringStringMapProto> iterable =
new Iterable<StringStringMapProto>() { new Iterable<StringStringMapProto>() {
@ -403,7 +400,7 @@ implements ContainerLaunchContext {
public Iterator<StringStringMapProto> iterator() { public Iterator<StringStringMapProto> iterator() {
return new Iterator<StringStringMapProto>() { return new Iterator<StringStringMapProto>() {
Iterator<String> keyIter = env.keySet().iterator(); Iterator<String> keyIter = environment.keySet().iterator();
@Override @Override
public void remove() { public void remove() {
@ -414,7 +411,7 @@ implements ContainerLaunchContext {
public StringStringMapProto next() { public StringStringMapProto next() {
String key = keyIter.next(); String key = keyIter.next();
return StringStringMapProto.newBuilder().setKey(key).setValue( return StringStringMapProto.newBuilder().setKey(key).setValue(
(env.get(key))).build(); (environment.get(key))).build();
} }
@Override @Override
@ -424,7 +421,7 @@ implements ContainerLaunchContext {
}; };
} }
}; };
builder.addAllEnv(iterable); builder.addAllEnvironment(iterable);
} }
private ResourcePBImpl convertFromProtoFormat(ResourceProto p) { private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {

View File

@ -188,17 +188,11 @@ message AMResponseProto {
//////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////
message ApplicationSubmissionContextProto { message ApplicationSubmissionContextProto {
optional ApplicationIdProto application_id = 1; optional ApplicationIdProto application_id = 1;
optional string application_name = 2; optional string application_name = 2 [default = "N/A"];
optional ResourceProto master_capability = 3; optional string user = 3;
repeated StringURLMapProto resources = 4; optional string queue = 4 [default = "default"];
repeated StringLocalResourceMapProto resources_todo = 5; optional PriorityProto priority = 5;
repeated string fs_tokens = 6; optional ContainerLaunchContextProto am_container_spec = 6;
optional bytes fs_tokens_todo = 7;
repeated StringStringMapProto environment = 8;
repeated string command = 9;
optional string queue = 10;
optional PriorityProto priority = 11;
optional string user = 12;
} }
message YarnClusterMetricsProto { message YarnClusterMetricsProto {
@ -242,7 +236,7 @@ message ContainerLaunchContextProto {
repeated StringLocalResourceMapProto localResources = 4; repeated StringLocalResourceMapProto localResources = 4;
optional bytes container_tokens = 5; optional bytes container_tokens = 5;
repeated StringBytesMapProto service_data = 6; repeated StringBytesMapProto service_data = 6;
repeated StringStringMapProto env = 7; repeated StringStringMapProto environment = 7;
repeated string command = 8; repeated string command = 8;
} }

View File

@ -219,6 +219,12 @@ public class YarnConfiguration extends Configuration {
RM_PREFIX + "max-completed-applications"; RM_PREFIX + "max-completed-applications";
public static final int DEFAULT_RM_MAX_COMPLETED_APPLICATIONS = 10000; public static final int DEFAULT_RM_MAX_COMPLETED_APPLICATIONS = 10000;
/** Default application name */
public static final String DEFAULT_APPLICATION_NAME = "N/A";
/** Default queue name */
public static final String DEFAULT_QUEUE_NAME = "default";
//////////////////////////////// ////////////////////////////////
// Node Manager Configs // Node Manager Configs
//////////////////////////////// ////////////////////////////////

View File

@ -89,7 +89,7 @@ public class ContainerLaunch implements Callable<Integer> {
final Map<Path,String> localResources = container.getLocalizedResources(); final Map<Path,String> localResources = container.getLocalizedResources();
String containerIdStr = ConverterUtils.toString(container.getContainerID()); String containerIdStr = ConverterUtils.toString(container.getContainerID());
final String user = launchContext.getUser(); final String user = launchContext.getUser();
final Map<String,String> env = launchContext.getEnv(); final Map<String,String> env = launchContext.getEnvironment();
final List<String> command = launchContext.getCommands(); final List<String> command = launchContext.getCommands();
int ret = -1; int ret = -1;
@ -109,7 +109,7 @@ public class ContainerLaunch implements Callable<Integer> {
} }
launchContext.setCommands(newCmds); launchContext.setCommands(newCmds);
Map<String, String> envs = launchContext.getEnv(); Map<String, String> envs = launchContext.getEnvironment();
Map<String, String> newEnvs = new HashMap<String, String>(envs.size()); Map<String, String> newEnvs = new HashMap<String, String>(envs.size());
for (Entry<String, String> entry : envs.entrySet()) { for (Entry<String, String> entry : envs.entrySet()) {
newEnvs.put( newEnvs.put(
@ -118,7 +118,7 @@ public class ContainerLaunch implements Callable<Integer> {
ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR,
containerLogDir.toUri().getPath())); containerLogDir.toUri().getPath()));
} }
launchContext.setEnv(newEnvs); launchContext.setEnvironment(newEnvs);
// /////////////////////////// End of variable expansion // /////////////////////////// End of variable expansion
FileContext lfs = FileContext.getLocalFSFileContext(); FileContext lfs = FileContext.getLocalFSFileContext();

View File

@ -71,7 +71,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstant
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.AbstractService;
@ -90,7 +89,6 @@ public class ClientRMService extends AbstractService implements
final private AtomicInteger applicationCounter = new AtomicInteger(0); final private AtomicInteger applicationCounter = new AtomicInteger(0);
final private YarnScheduler scheduler; final private YarnScheduler scheduler;
final private RMContext rmContext; final private RMContext rmContext;
private final AMLivelinessMonitor amLivelinessMonitor;
private final RMAppManager rmAppManager; private final RMAppManager rmAppManager;
private String clientServiceBindAddress; private String clientServiceBindAddress;
@ -106,7 +104,6 @@ public class ClientRMService extends AbstractService implements
super(ClientRMService.class.getName()); super(ClientRMService.class.getName());
this.scheduler = scheduler; this.scheduler = scheduler;
this.rmContext = rmContext; this.rmContext = rmContext;
this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
this.rmAppManager = rmAppManager; this.rmAppManager = rmAppManager;
} }
@ -195,15 +192,18 @@ public class ClientRMService extends AbstractService implements
SubmitApplicationRequest request) throws YarnRemoteException { SubmitApplicationRequest request) throws YarnRemoteException {
ApplicationSubmissionContext submissionContext = request ApplicationSubmissionContext submissionContext = request
.getApplicationSubmissionContext(); .getApplicationSubmissionContext();
ApplicationId applicationId = null; ApplicationId applicationId = submissionContext.getApplicationId();
String user = null; String user = submissionContext.getUser();
try { try {
user = UserGroupInformation.getCurrentUser().getShortUserName(); user = UserGroupInformation.getCurrentUser().getShortUserName();
applicationId = submissionContext.getApplicationId();
if (rmContext.getRMApps().get(applicationId) != null) { if (rmContext.getRMApps().get(applicationId) != null) {
throw new IOException("Application with id " + applicationId throw new IOException("Application with id " + applicationId
+ " is already present! Cannot add a duplicate!"); + " is already present! Cannot add a duplicate!");
} }
// Safety
submissionContext.setUser(user);
// This needs to be synchronous as the client can query // This needs to be synchronous as the client can query
// immediately following the submission to get the application status. // immediately following the submission to get the application status.
// So call handle directly and do not send an event. // So call handle directly and do not send an event.
@ -226,6 +226,7 @@ public class ClientRMService extends AbstractService implements
return response; return response;
} }
@SuppressWarnings("unchecked")
@Override @Override
public FinishApplicationResponse finishApplication( public FinishApplicationResponse finishApplication(
FinishApplicationRequest request) throws YarnRemoteException { FinishApplicationRequest request) throws YarnRemoteException {

View File

@ -210,7 +210,9 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent> {
} }
} }
protected synchronized void submitApplication(ApplicationSubmissionContext submissionContext) { @SuppressWarnings("unchecked")
protected synchronized void submitApplication(
ApplicationSubmissionContext submissionContext) {
ApplicationId applicationId = submissionContext.getApplicationId(); ApplicationId applicationId = submissionContext.getApplicationId();
RMApp application = null; RMApp application = null;
try { try {
@ -224,27 +226,37 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent> {
clientTokenStr = clientToken.encodeToUrlString(); clientTokenStr = clientToken.encodeToUrlString();
LOG.debug("Sending client token as " + clientTokenStr); LOG.debug("Sending client token as " + clientTokenStr);
} }
submissionContext.setQueue(submissionContext.getQueue() == null
? "default" : submissionContext.getQueue()); // Sanity checks
submissionContext.setApplicationName(submissionContext if (submissionContext.getQueue() == null) {
.getApplicationName() == null ? "N/A" : submissionContext submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
.getApplicationName()); }
if (submissionContext.getApplicationName() == null) {
submissionContext.setApplicationName(
YarnConfiguration.DEFAULT_APPLICATION_NAME);
}
// Store application for recovery
ApplicationStore appStore = rmContext.getApplicationsStore() ApplicationStore appStore = rmContext.getApplicationsStore()
.createApplicationStore(submissionContext.getApplicationId(), .createApplicationStore(submissionContext.getApplicationId(),
submissionContext); submissionContext);
// Create RMApp
application = new RMAppImpl(applicationId, rmContext, application = new RMAppImpl(applicationId, rmContext,
this.conf, submissionContext.getApplicationName(), user, this.conf, submissionContext.getApplicationName(), user,
submissionContext.getQueue(), submissionContext, clientTokenStr, submissionContext.getQueue(), submissionContext, clientTokenStr,
appStore, rmContext.getAMLivelinessMonitor(), this.scheduler, appStore, this.scheduler,
this.masterService); this.masterService);
if (rmContext.getRMApps().putIfAbsent(applicationId, application) != null) { if (rmContext.getRMApps().putIfAbsent(applicationId, application) !=
null) {
LOG.info("Application with id " + applicationId + LOG.info("Application with id " + applicationId +
" is already present! Cannot add a duplicate!"); " is already present! Cannot add a duplicate!");
// don't send event through dispatcher as it will be handled by app already // don't send event through dispatcher as it will be handled by app
// present with this id. // already present with this id.
application.handle(new RMAppRejectedEvent(applicationId, application.handle(new RMAppRejectedEvent(applicationId,
"Application with this id is already present! Cannot add a duplicate!")); "Application with this id is already present! " +
"Cannot add a duplicate!"));
} else { } else {
this.rmContext.getDispatcher().getEventHandler().handle( this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.START)); new RMAppEvent(applicationId, RMAppEventType.START));

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
public class RMAppManagerSubmitEvent extends RMAppManagerEvent { public class RMAppManagerSubmitEvent extends RMAppManagerEvent {

View File

@ -23,7 +23,6 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -120,7 +119,8 @@ public class AMLauncher implements Runnable {
+ " for AM " + application.getAppAttemptId()); + " for AM " + application.getAppAttemptId());
ContainerLaunchContext launchContext = ContainerLaunchContext launchContext =
createAMContainerLaunchContext(applicationContext, masterContainerID); createAMContainerLaunchContext(applicationContext, masterContainerID);
StartContainerRequest request = recordFactory.newRecordInstance(StartContainerRequest.class); StartContainerRequest request =
recordFactory.newRecordInstance(StartContainerRequest.class);
request.setContainerLaunchContext(launchContext); request.setContainerLaunchContext(launchContext);
containerMgrProxy.startContainer(request); containerMgrProxy.startContainer(request);
LOG.info("Done launching container " + application.getMasterContainer() LOG.info("Done launching container " + application.getMasterContainer()
@ -130,7 +130,8 @@ public class AMLauncher implements Runnable {
private void cleanup() throws IOException { private void cleanup() throws IOException {
connect(); connect();
ContainerId containerId = application.getMasterContainer().getId(); ContainerId containerId = application.getMasterContainer().getId();
StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class); StopContainerRequest stopRequest =
recordFactory.newRecordInstance(StopContainerRequest.class);
stopRequest.setContainerId(containerId); stopRequest.setContainerId(containerId);
containerMgrProxy.stopContainer(stopRequest); containerMgrProxy.stopContainer(stopRequest);
} }
@ -145,7 +146,7 @@ public class AMLauncher implements Runnable {
final YarnRPC rpc = YarnRPC.create(conf); // TODO: Don't create again and again. final YarnRPC rpc = YarnRPC.create(conf); // TODO: Don't create again and again.
UserGroupInformation currentUser = UserGroupInformation currentUser =
UserGroupInformation.createRemoteUser("TODO"); // TODO UserGroupInformation.createRemoteUser("yarn"); // TODO
if (UserGroupInformation.isSecurityEnabled()) { if (UserGroupInformation.isSecurityEnabled()) {
ContainerToken containerToken = container.getContainerToken(); ContainerToken containerToken = container.getContainerToken();
Token<ContainerTokenIdentifier> token = Token<ContainerTokenIdentifier> token =
@ -170,8 +171,8 @@ public class AMLauncher implements Runnable {
ContainerId containerID) throws IOException { ContainerId containerID) throws IOException {
// Construct the actual Container // Construct the actual Container
ContainerLaunchContext container = recordFactory.newRecordInstance(ContainerLaunchContext.class); ContainerLaunchContext container =
container.setCommands(applicationMasterContext.getCommandList()); applicationMasterContext.getAMContainerSpec();
StringBuilder mergedCommand = new StringBuilder(); StringBuilder mergedCommand = new StringBuilder();
String failCount = Integer.toString(application.getAppAttemptId() String failCount = Integer.toString(application.getAppAttemptId()
.getAttemptId()); .getAttemptId());
@ -189,34 +190,28 @@ public class AMLauncher implements Runnable {
LOG.info("Command to launch container " + LOG.info("Command to launch container " +
containerID + " : " + mergedCommand); containerID + " : " + mergedCommand);
Map<String, String> environment =
applicationMasterContext.getAllEnvironment(); // Finalize the container
environment.putAll(setupTokensInEnv(applicationMasterContext));
container.setEnv(environment);
// Construct the actual Container
container.setContainerId(containerID); container.setContainerId(containerID);
container.setUser(applicationMasterContext.getUser()); container.setUser(applicationMasterContext.getUser());
container.setResource(applicationMasterContext.getMasterCapability()); setupTokensAndEnv(container);
container.setLocalResources(applicationMasterContext.getAllResourcesTodo());
container.setContainerTokens(applicationMasterContext.getFsTokensTodo());
return container; return container;
} }
private Map<String, String> setupTokensInEnv( private void setupTokensAndEnv(
ApplicationSubmissionContext asc) ContainerLaunchContext container)
throws IOException { throws IOException {
Map<String, String> env = Map<String, String> environment = container.getEnvironment();
new HashMap<String, String>();
if (UserGroupInformation.isSecurityEnabled()) { if (UserGroupInformation.isSecurityEnabled()) {
// TODO: Security enabled/disabled info should come from RM. // TODO: Security enabled/disabled info should come from RM.
Credentials credentials = new Credentials(); Credentials credentials = new Credentials();
DataInputByteBuffer dibb = new DataInputByteBuffer(); DataInputByteBuffer dibb = new DataInputByteBuffer();
if (asc.getFsTokensTodo() != null) { if (container.getContainerTokens() != null) {
// TODO: Don't do this kind of checks everywhere. // TODO: Don't do this kind of checks everywhere.
dibb.reset(asc.getFsTokensTodo()); dibb.reset(container.getContainerTokens());
credentials.readTokenStorageStream(dibb); credentials.readTokenStorageStream(dibb);
} }
@ -236,14 +231,16 @@ public class AMLauncher implements Runnable {
token.setService(new Text(resolvedAddr)); token.setService(new Text(resolvedAddr));
String appMasterTokenEncoded = token.encodeToUrlString(); String appMasterTokenEncoded = token.encodeToUrlString();
LOG.debug("Putting appMaster token in env : " + appMasterTokenEncoded); LOG.debug("Putting appMaster token in env : " + appMasterTokenEncoded);
env.put(ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME, environment.put(
ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME,
appMasterTokenEncoded); appMasterTokenEncoded);
// Add the RM token // Add the RM token
credentials.addToken(new Text(resolvedAddr), token); credentials.addToken(new Text(resolvedAddr), token);
DataOutputBuffer dob = new DataOutputBuffer(); DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob); credentials.writeTokenStorageToStream(dob);
asc.setFsTokensTodo(ByteBuffer.wrap(dob.getData(), 0, dob.getLength())); container.setContainerTokens(
ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
ApplicationTokenIdentifier identifier = new ApplicationTokenIdentifier( ApplicationTokenIdentifier identifier = new ApplicationTokenIdentifier(
application.getAppAttemptId().getApplicationId()); application.getAppAttemptId().getApplicationId());
@ -252,9 +249,10 @@ public class AMLauncher implements Runnable {
String encoded = String encoded =
Base64.encodeBase64URLSafeString(clientSecretKey.getEncoded()); Base64.encodeBase64URLSafeString(clientSecretKey.getEncoded());
LOG.debug("The encoded client secret-key to be put in env : " + encoded); LOG.debug("The encoded client secret-key to be put in env : " + encoded);
env.put(ApplicationConstants.APPLICATION_CLIENT_SECRET_ENV_NAME, encoded); environment.put(
ApplicationConstants.APPLICATION_CLIENT_SECRET_ENV_NAME,
encoded);
} }
return env;
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")

View File

@ -86,7 +86,6 @@ public class RMAppImpl implements RMApp {
// Mutable fields // Mutable fields
private long startTime; private long startTime;
private long finishTime; private long finishTime;
private AMLivelinessMonitor amLivelinessMonitor;
private RMAppAttempt currentAttempt; private RMAppAttempt currentAttempt;
private static final FinalTransition FINAL_TRANSITION = new FinalTransition(); private static final FinalTransition FINAL_TRANSITION = new FinalTransition();
@ -163,7 +162,7 @@ public class RMAppImpl implements RMApp {
public RMAppImpl(ApplicationId applicationId, RMContext rmContext, public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
Configuration config, String name, String user, String queue, Configuration config, String name, String user, String queue,
ApplicationSubmissionContext submissionContext, String clientTokenStr, ApplicationSubmissionContext submissionContext, String clientTokenStr,
ApplicationStore appStore, AMLivelinessMonitor amLivelinessMonitor, ApplicationStore appStore,
YarnScheduler scheduler, ApplicationMasterService masterService) { YarnScheduler scheduler, ApplicationMasterService masterService) {
this.applicationId = applicationId; this.applicationId = applicationId;
@ -176,7 +175,6 @@ public class RMAppImpl implements RMApp {
this.submissionContext = submissionContext; this.submissionContext = submissionContext;
this.clientTokenStr = clientTokenStr; this.clientTokenStr = clientTokenStr;
this.appStore = appStore; this.appStore = appStore;
this.amLivelinessMonitor = amLivelinessMonitor;
this.scheduler = scheduler; this.scheduler = scheduler;
this.masterService = masterService; this.masterService = masterService;
this.startTime = System.currentTimeMillis(); this.startTime = System.currentTimeMillis();
@ -380,6 +378,7 @@ public class RMAppImpl implements RMApp {
} }
} }
@SuppressWarnings("unchecked")
private void createNewAttempt() { private void createNewAttempt() {
ApplicationAttemptId appAttemptId = Records ApplicationAttemptId appAttemptId = Records
.newRecord(ApplicationAttemptId.class); .newRecord(ApplicationAttemptId.class);
@ -434,6 +433,7 @@ public class RMAppImpl implements RMApp {
return nodes; return nodes;
} }
@SuppressWarnings("unchecked")
public void transition(RMAppImpl app, RMAppEvent event) { public void transition(RMAppImpl app, RMAppEvent event) {
Set<NodeId> nodes = getNodesOnWhichAttemptRan(app); Set<NodeId> nodes = getNodesOnWhichAttemptRan(app);
for (NodeId nodeId : nodes) { for (NodeId nodeId : nodes) {

View File

@ -84,6 +84,7 @@ public class RMAppAttemptImpl implements RMAppAttempt {
RMAppAttemptEvent> stateMachine; RMAppAttemptEvent> stateMachine;
private final RMContext rmContext; private final RMContext rmContext;
@SuppressWarnings("rawtypes")
private final EventHandler eventHandler; private final EventHandler eventHandler;
private final YarnScheduler scheduler; private final YarnScheduler scheduler;
private final ApplicationMasterService masterService; private final ApplicationMasterService masterService;
@ -459,7 +460,7 @@ public class RMAppAttemptImpl implements RMAppAttempt {
// Request a container for the AM. // Request a container for the AM.
ResourceRequest request = BuilderUtils.newResourceRequest( ResourceRequest request = BuilderUtils.newResourceRequest(
AM_CONTAINER_PRIORITY, "*", appAttempt.submissionContext AM_CONTAINER_PRIORITY, "*", appAttempt.submissionContext
.getMasterCapability(), 1); .getAMContainerSpec().getResource(), 1);
LOG.debug("About to request resources for AM of " LOG.debug("About to request resources for AM of "
+ appAttempt.applicationAttemptId + " required " + request); + appAttempt.applicationAttemptId + " required " + request);

View File

@ -23,7 +23,6 @@ import static org.apache.hadoop.yarn.webapp.view.JQueryUI._PROGRESSBAR;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI._PROGRESSBAR_VALUE; import static org.apache.hadoop.yarn.webapp.view.JQueryUI._PROGRESSBAR_VALUE;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
@ -59,7 +58,8 @@ class AppsBlock extends HtmlBlock {
String appId = app.getApplicationId().toString(); String appId = app.getApplicationId().toString();
String trackingUrl = app.getTrackingUrl(); String trackingUrl = app.getTrackingUrl();
String ui = trackingUrl == null || trackingUrl.isEmpty() ? "UNASSIGNED" : String ui = trackingUrl == null || trackingUrl.isEmpty() ? "UNASSIGNED" :
(app.getFinishTime() == 0 ? "ApplicationMaster" : "JobHistory"); (app.getFinishTime() == 0 ?
"ApplicationMaster URL" : "JobHistory URL");
String percent = String.format("%.1f", app.getProgress() * 100); String percent = String.format("%.1f", app.getProgress() * 100);
tbody. tbody.
tr(). tr().

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
@ -81,13 +82,17 @@ public class MockRM extends ResourceManager {
ApplicationId appId = resp.getApplicationId(); ApplicationId appId = resp.getApplicationId();
SubmitApplicationRequest req = Records.newRecord(SubmitApplicationRequest.class); SubmitApplicationRequest req = Records.newRecord(SubmitApplicationRequest.class);
ApplicationSubmissionContext sub = Records.newRecord(ApplicationSubmissionContext.class); ApplicationSubmissionContext sub =
Records.newRecord(ApplicationSubmissionContext.class);
sub.setApplicationId(appId); sub.setApplicationId(appId);
sub.setApplicationName(""); sub.setApplicationName("");
sub.setUser(""); sub.setUser("");
ContainerLaunchContext clc =
Records.newRecord(ContainerLaunchContext.class);
Resource capability = Records.newRecord(Resource.class); Resource capability = Records.newRecord(Resource.class);
capability.setMemory(masterMemory); capability.setMemory(masterMemory);
sub.setMasterCapability(capability); clc.setResource(capability);
sub.setAMContainerSpec(clc);
req.setApplicationSubmissionContext(sub); req.setApplicationSubmissionContext(sub);
client.submitApplication(req); client.submitApplication(req);

View File

@ -18,19 +18,12 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import static org.mockito.Mockito.*;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import junit.framework.Assert; import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.AsyncDispatcher;
@ -44,7 +37,6 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager; import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager; import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager;
@ -63,8 +55,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.service.Service;
import org.junit.After;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -75,7 +65,6 @@ import com.google.common.collect.Lists;
*/ */
public class TestAppManager{ public class TestAppManager{
private static final Log LOG = LogFactory.getLog(TestAppManager.class);
private static RMAppEventType appEventType = RMAppEventType.KILL; private static RMAppEventType appEventType = RMAppEventType.KILL;
public synchronized RMAppEventType getAppEventType() { public synchronized RMAppEventType getAppEventType() {
@ -117,10 +106,8 @@ public class TestAppManager{
public class TestAppManagerDispatcher implements public class TestAppManagerDispatcher implements
EventHandler<RMAppManagerEvent> { EventHandler<RMAppManagerEvent> {
private final RMContext rmContext;
public TestAppManagerDispatcher(RMContext rmContext) { public TestAppManagerDispatcher() {
this.rmContext = rmContext;
} }
@Override @Override
@ -132,15 +119,11 @@ public class TestAppManager{
public class TestDispatcher implements public class TestDispatcher implements
EventHandler<RMAppEvent> { EventHandler<RMAppEvent> {
private final RMContext rmContext; public TestDispatcher() {
public TestDispatcher(RMContext rmContext) {
this.rmContext = rmContext;
} }
@Override @Override
public void handle(RMAppEvent event) { public void handle(RMAppEvent event) {
ApplicationId appID = event.getApplicationId();
//RMApp rmApp = this.rmContext.getRMApps().get(appID); //RMApp rmApp = this.rmContext.getRMApps().get(appID);
setAppEventType(event.getType()); setAppEventType(event.getType());
System.out.println("in handle routine " + getAppEventType().toString()); System.out.println("in handle routine " + getAppEventType().toString());
@ -178,7 +161,8 @@ public class TestAppManager{
public void setCompletedAppsMax(int max) { public void setCompletedAppsMax(int max) {
super.setCompletedAppsMax(max); super.setCompletedAppsMax(max);
} }
public void submitApplication(ApplicationSubmissionContext submissionContext) { public void submitApplication(
ApplicationSubmissionContext submissionContext) {
super.submitApplication(submissionContext); super.submitApplication(submissionContext);
} }
} }
@ -336,8 +320,9 @@ public class TestAppManager{
} }
protected void setupDispatcher(RMContext rmContext, Configuration conf) { protected void setupDispatcher(RMContext rmContext, Configuration conf) {
TestDispatcher testDispatcher = new TestDispatcher(rmContext); TestDispatcher testDispatcher = new TestDispatcher();
TestAppManagerDispatcher testAppManagerDispatcher = new TestAppManagerDispatcher(rmContext); TestAppManagerDispatcher testAppManagerDispatcher =
new TestAppManagerDispatcher();
rmContext.getDispatcher().register(RMAppEventType.class, testDispatcher); rmContext.getDispatcher().register(RMAppEventType.class, testDispatcher);
rmContext.getDispatcher().register(RMAppManagerEventType.class, testAppManagerDispatcher); rmContext.getDispatcher().register(RMAppManagerEventType.class, testAppManagerDispatcher);
((Service)rmContext.getDispatcher()).init(conf); ((Service)rmContext.getDispatcher()).init(conf);
@ -359,7 +344,8 @@ public class TestAppManager{
ApplicationId appID = MockApps.newAppID(1); ApplicationId appID = MockApps.newAppID(1);
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class); ApplicationSubmissionContext context =
recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
context.setApplicationId(appID); context.setApplicationId(appID);
setupDispatcher(rmContext, conf); setupDispatcher(rmContext, conf);
@ -367,8 +353,12 @@ public class TestAppManager{
RMApp app = rmContext.getRMApps().get(appID); RMApp app = rmContext.getRMApps().get(appID);
Assert.assertNotNull("app is null", app); Assert.assertNotNull("app is null", app);
Assert.assertEquals("app id doesn't match", appID, app.getApplicationId()); Assert.assertEquals("app id doesn't match", appID, app.getApplicationId());
Assert.assertEquals("app name doesn't match", "N/A", app.getName()); Assert.assertEquals("app name doesn't match",
Assert.assertEquals("app queue doesn't match", "default", app.getQueue()); YarnConfiguration.DEFAULT_APPLICATION_NAME,
app.getName());
Assert.assertEquals("app queue doesn't match",
YarnConfiguration.DEFAULT_QUEUE_NAME,
app.getQueue());
Assert.assertEquals("app state doesn't match", RMAppState.NEW, app.getState()); Assert.assertEquals("app state doesn't match", RMAppState.NEW, app.getState());
Assert.assertNotNull("app store is null", app.getApplicationStore()); Assert.assertNotNull("app store is null", app.getApplicationStore());

View File

@ -128,7 +128,7 @@ public class TestRMAppTransitions {
RMApp application = new RMAppImpl(applicationId, rmContext, RMApp application = new RMAppImpl(applicationId, rmContext,
conf, name, user, conf, name, user,
queue, submissionContext, clientTokenStr, queue, submissionContext, clientTokenStr,
appStore, rmContext.getAMLivelinessMonitor(), scheduler, appStore, scheduler,
masterService); masterService);
testAppStartState(applicationId, user, name, queue, application); testAppStartState(applicationId, user, name, queue, application);

View File

@ -27,6 +27,8 @@ import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List; import java.util.List;
import junit.framework.Assert; import junit.framework.Assert;
@ -54,10 +56,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationMaster;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerToken; import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceType;
@ -77,6 +79,7 @@ import org.apache.hadoop.yarn.security.ContainerManagerSecurityInfo;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.SchedulerSecurityInfo; import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@ -137,15 +140,11 @@ public class TestContainerTokenSecretManager {
ApplicationSubmissionContext appSubmissionContext = ApplicationSubmissionContext appSubmissionContext =
recordFactory.newRecordInstance(ApplicationSubmissionContext.class); recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
appSubmissionContext.setApplicationId(appID); appSubmissionContext.setApplicationId(appID);
appSubmissionContext.setMasterCapability(recordFactory ContainerLaunchContext amContainer =
.newRecordInstance(Resource.class)); recordFactory.newRecordInstance(ContainerLaunchContext.class);
appSubmissionContext.getMasterCapability().setMemory(1024); amContainer.setResource(Resources.createResource(1024));
// appSubmissionContext.resources = new HashMap<String, URL>(); amContainer.setCommands(Arrays.asList("sleep", "100"));
appSubmissionContext.setUser("testUser"); appSubmissionContext.setUser("testUser");
// appSubmissionContext.environment = new HashMap<String, String>();
// appSubmissionContext.command = new ArrayList<String>();
appSubmissionContext.addCommand("sleep");
appSubmissionContext.addCommand("100");
// TODO: Use a resource to work around bugs. Today NM doesn't create local // TODO: Use a resource to work around bugs. Today NM doesn't create local
// app-dirs if there are no file to download!! // app-dirs if there are no file to download!!
@ -162,10 +161,11 @@ public class TestContainerTokenSecretManager {
rsrc.setTimestamp(file.lastModified()); rsrc.setTimestamp(file.lastModified());
rsrc.setType(LocalResourceType.FILE); rsrc.setType(LocalResourceType.FILE);
rsrc.setVisibility(LocalResourceVisibility.PRIVATE); rsrc.setVisibility(LocalResourceVisibility.PRIVATE);
appSubmissionContext.setResourceTodo("testFile", rsrc); amContainer.setLocalResources(Collections.singletonMap("testFile", rsrc));
SubmitApplicationRequest submitRequest = recordFactory SubmitApplicationRequest submitRequest = recordFactory
.newRecordInstance(SubmitApplicationRequest.class); .newRecordInstance(SubmitApplicationRequest.class);
submitRequest.setApplicationSubmissionContext(appSubmissionContext); submitRequest.setApplicationSubmissionContext(appSubmissionContext);
appSubmissionContext.setAMContainerSpec(amContainer);
resourceManager.getClientRMService().submitApplication(submitRequest); resourceManager.getClientRMService().submitApplication(submitRequest);
// Wait till container gets allocated for AM // Wait till container gets allocated for AM