MAPREDUCE-6825. YARNRunner#createApplicationSubmissionContext method is longer than 150 lines (Contributed by Gergely Novák via Daniel Templeton)
(cherry picked from commit 732ee6f0b5
)
This commit is contained in:
parent
6c5fd82773
commit
4c883f331c
|
@ -291,8 +291,7 @@ public class YARNRunner implements ClientProtocol {
|
|||
throws IOException, InterruptedException {
|
||||
|
||||
addHistoryToken(ts);
|
||||
|
||||
// Construct necessary information to start the MR AM
|
||||
|
||||
ApplicationSubmissionContext appContext =
|
||||
createApplicationSubmissionContext(conf, jobSubmitDir, ts);
|
||||
|
||||
|
@ -331,34 +330,15 @@ public class YARNRunner implements ClientProtocol {
|
|||
return rsrc;
|
||||
}
|
||||
|
||||
public ApplicationSubmissionContext createApplicationSubmissionContext(
|
||||
Configuration jobConf,
|
||||
String jobSubmitDir, Credentials ts) throws IOException {
|
||||
ApplicationId applicationId = resMgrDelegate.getApplicationId();
|
||||
|
||||
// Setup resource requirements
|
||||
Resource capability = recordFactory.newRecordInstance(Resource.class);
|
||||
capability.setMemorySize(
|
||||
conf.getInt(
|
||||
MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB
|
||||
)
|
||||
);
|
||||
capability.setVirtualCores(
|
||||
conf.getInt(
|
||||
MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES
|
||||
)
|
||||
);
|
||||
LOG.debug("AppMaster capability = " + capability);
|
||||
|
||||
// Setup LocalResources
|
||||
Map<String, LocalResource> localResources =
|
||||
new HashMap<String, LocalResource>();
|
||||
private Map<String, LocalResource> setupLocalResources(Configuration jobConf,
|
||||
String jobSubmitDir) throws IOException {
|
||||
Map<String, LocalResource> localResources = new HashMap<>();
|
||||
|
||||
Path jobConfPath = new Path(jobSubmitDir, MRJobConfig.JOB_CONF_FILE);
|
||||
|
||||
URL yarnUrlForJobSubmitDir = URL.fromPath(defaultFileContext.getDefaultFileSystem()
|
||||
.resolvePath(
|
||||
defaultFileContext.makeQualified(new Path(jobSubmitDir))));
|
||||
URL yarnUrlForJobSubmitDir = URL.fromPath(defaultFileContext
|
||||
.getDefaultFileSystem().resolvePath(
|
||||
defaultFileContext.makeQualified(new Path(jobSubmitDir))));
|
||||
LOG.debug("Creating setup context, jobSubmitDir url is "
|
||||
+ yarnUrlForJobSubmitDir);
|
||||
|
||||
|
@ -371,7 +351,7 @@ public class YARNRunner implements ClientProtocol {
|
|||
FileContext.getFileContext(jobJarPath.toUri(), jobConf),
|
||||
jobJarPath,
|
||||
LocalResourceType.PATTERN);
|
||||
String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN,
|
||||
String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN,
|
||||
JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
|
||||
rc.setPattern(pattern);
|
||||
localResources.put(MRJobConfig.JOB_JAR, rc);
|
||||
|
@ -392,13 +372,11 @@ public class YARNRunner implements ClientProtocol {
|
|||
new Path(jobSubmitDir, s), LocalResourceType.FILE));
|
||||
}
|
||||
|
||||
// Setup security tokens
|
||||
DataOutputBuffer dob = new DataOutputBuffer();
|
||||
ts.writeTokenStorageToStream(dob);
|
||||
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
||||
return localResources;
|
||||
}
|
||||
|
||||
// Setup the command to run the AM
|
||||
List<String> vargs = new ArrayList<String>(8);
|
||||
private List<String> setupAMCommand(Configuration jobConf) {
|
||||
List<String> vargs = new ArrayList<>(8);
|
||||
vargs.add(MRApps.crossPlatformifyMREnv(jobConf, Environment.JAVA_HOME)
|
||||
+ "/bin/java");
|
||||
|
||||
|
@ -409,27 +387,35 @@ public class YARNRunner implements ClientProtocol {
|
|||
MRApps.addLog4jSystemProperties(null, vargs, conf);
|
||||
|
||||
// Check for Java Lib Path usage in MAP and REDUCE configs
|
||||
warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS,""), "map",
|
||||
MRJobConfig.MAP_JAVA_OPTS, MRJobConfig.MAP_ENV);
|
||||
warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,""), "map",
|
||||
MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, MRJobConfig.MAPRED_ADMIN_USER_ENV);
|
||||
warnForJavaLibPath(conf.get(MRJobConfig.REDUCE_JAVA_OPTS,""), "reduce",
|
||||
MRJobConfig.REDUCE_JAVA_OPTS, MRJobConfig.REDUCE_ENV);
|
||||
warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,""), "reduce",
|
||||
MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, MRJobConfig.MAPRED_ADMIN_USER_ENV);
|
||||
warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS, ""),
|
||||
"map",
|
||||
MRJobConfig.MAP_JAVA_OPTS,
|
||||
MRJobConfig.MAP_ENV);
|
||||
warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, ""),
|
||||
"map",
|
||||
MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
|
||||
MRJobConfig.MAPRED_ADMIN_USER_ENV);
|
||||
warnForJavaLibPath(conf.get(MRJobConfig.REDUCE_JAVA_OPTS, ""),
|
||||
"reduce",
|
||||
MRJobConfig.REDUCE_JAVA_OPTS,
|
||||
MRJobConfig.REDUCE_ENV);
|
||||
warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, ""),
|
||||
"reduce",
|
||||
MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
|
||||
MRJobConfig.MAPRED_ADMIN_USER_ENV);
|
||||
|
||||
// Add AM admin command opts before user command opts
|
||||
// so that it can be overridden by user
|
||||
String mrAppMasterAdminOptions = conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
|
||||
MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS);
|
||||
warnForJavaLibPath(mrAppMasterAdminOptions, "app master",
|
||||
warnForJavaLibPath(mrAppMasterAdminOptions, "app master",
|
||||
MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.MR_AM_ADMIN_USER_ENV);
|
||||
vargs.add(mrAppMasterAdminOptions);
|
||||
|
||||
|
||||
// Add AM user command opts
|
||||
String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
|
||||
MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS);
|
||||
warnForJavaLibPath(mrAppMasterUserOptions, "app master",
|
||||
warnForJavaLibPath(mrAppMasterUserOptions, "app master",
|
||||
MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV);
|
||||
vargs.add(mrAppMasterUserOptions);
|
||||
|
||||
|
@ -449,9 +435,14 @@ public class YARNRunner implements ClientProtocol {
|
|||
Path.SEPARATOR + ApplicationConstants.STDOUT);
|
||||
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
|
||||
Path.SEPARATOR + ApplicationConstants.STDERR);
|
||||
return vargs;
|
||||
}
|
||||
|
||||
private ContainerLaunchContext setupContainerLaunchContextForAM(
|
||||
Configuration jobConf, Map<String, LocalResource> localResources,
|
||||
ByteBuffer securityTokens, List<String> vargs) throws IOException {
|
||||
|
||||
Vector<String> vargsFinal = new Vector<String>(8);
|
||||
Vector<String> vargsFinal = new Vector<>(8);
|
||||
// Final command
|
||||
StringBuilder mergedCommand = new StringBuilder();
|
||||
for (CharSequence str : vargs) {
|
||||
|
@ -464,7 +455,7 @@ public class YARNRunner implements ClientProtocol {
|
|||
|
||||
// Setup the CLASSPATH in environment
|
||||
// i.e. add { Hadoop jars, job jar, CWD } to classpath.
|
||||
Map<String, String> environment = new HashMap<String, String>();
|
||||
Map<String, String> environment = new HashMap<>();
|
||||
MRApps.setClasspath(environment, conf);
|
||||
|
||||
// Shell
|
||||
|
@ -477,28 +468,68 @@ public class YARNRunner implements ClientProtocol {
|
|||
MRApps.crossPlatformifyMREnv(conf, Environment.PWD), conf);
|
||||
|
||||
// Setup the environment variables for Admin first
|
||||
MRApps.setEnvFromInputString(environment,
|
||||
MRApps.setEnvFromInputString(environment,
|
||||
conf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV,
|
||||
MRJobConfig.DEFAULT_MR_AM_ADMIN_USER_ENV), conf);
|
||||
// Setup the environment variables (LD_LIBRARY_PATH, etc)
|
||||
MRApps.setEnvFromInputString(environment,
|
||||
MRApps.setEnvFromInputString(environment,
|
||||
conf.get(MRJobConfig.MR_AM_ENV), conf);
|
||||
|
||||
// Parse distributed cache
|
||||
MRApps.setupDistributedCache(jobConf, localResources);
|
||||
|
||||
Map<ApplicationAccessType, String> acls
|
||||
= new HashMap<ApplicationAccessType, String>(2);
|
||||
Map<ApplicationAccessType, String> acls = new HashMap<>(2);
|
||||
acls.put(ApplicationAccessType.VIEW_APP, jobConf.get(
|
||||
MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
|
||||
acls.put(ApplicationAccessType.MODIFY_APP, jobConf.get(
|
||||
MRJobConfig.JOB_ACL_MODIFY_JOB,
|
||||
MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));
|
||||
|
||||
return ContainerLaunchContext.newInstance(localResources, environment,
|
||||
vargsFinal, null, securityTokens, acls);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs all the necessary information to start the MR AM.
|
||||
* @param jobConf the configuration for the MR job
|
||||
* @param jobSubmitDir the directory path for the job
|
||||
* @param ts the security credentials for the job
|
||||
* @return ApplicationSubmissionContext
|
||||
* @throws IOException on IO error (e.g. path resolution)
|
||||
*/
|
||||
public ApplicationSubmissionContext createApplicationSubmissionContext(
|
||||
Configuration jobConf, String jobSubmitDir, Credentials ts)
|
||||
throws IOException {
|
||||
ApplicationId applicationId = resMgrDelegate.getApplicationId();
|
||||
|
||||
// Setup resource requirements
|
||||
Resource capability = recordFactory.newRecordInstance(Resource.class);
|
||||
capability.setMemorySize(
|
||||
conf.getInt(
|
||||
MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB
|
||||
)
|
||||
);
|
||||
capability.setVirtualCores(
|
||||
conf.getInt(
|
||||
MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES
|
||||
)
|
||||
);
|
||||
LOG.debug("AppMaster capability = " + capability);
|
||||
|
||||
// Setup LocalResources
|
||||
Map<String, LocalResource> localResources =
|
||||
setupLocalResources(jobConf, jobSubmitDir);
|
||||
|
||||
// Setup security tokens
|
||||
DataOutputBuffer dob = new DataOutputBuffer();
|
||||
ts.writeTokenStorageToStream(dob);
|
||||
ByteBuffer securityTokens =
|
||||
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
||||
|
||||
// Setup ContainerLaunchContext for AM container
|
||||
ContainerLaunchContext amContainer =
|
||||
ContainerLaunchContext.newInstance(localResources, environment,
|
||||
vargsFinal, null, securityTokens, acls);
|
||||
List<String> vargs = setupAMCommand(jobConf);
|
||||
ContainerLaunchContext amContainer = setupContainerLaunchContextForAM(
|
||||
jobConf, localResources, securityTokens, vargs);
|
||||
|
||||
String regex = conf.get(MRJobConfig.MR_JOB_SEND_TOKEN_CONF);
|
||||
if (regex != null && !regex.isEmpty()) {
|
||||
|
@ -566,7 +597,7 @@ public class YARNRunner implements ClientProtocol {
|
|||
|
||||
appContext.setApplicationType(MRJobConfig.MR_APPLICATION_TYPE);
|
||||
if (tagsFromConf != null && !tagsFromConf.isEmpty()) {
|
||||
appContext.setApplicationTags(new HashSet<String>(tagsFromConf));
|
||||
appContext.setApplicationTags(new HashSet<>(tagsFromConf));
|
||||
}
|
||||
|
||||
String jobPriority = jobConf.get(MRJobConfig.PRIORITY);
|
||||
|
|
Loading…
Reference in New Issue