MAPREDUCE-6052. Supported overriding the default container-log4j.properties file per job. Contributed by Junping Du.

(cherry picked from commit ed63b11646)
This commit is contained in:
Zhijie Shen 2014-11-01 00:47:57 -07:00
parent e286ae90b7
commit c4aaa4db6c
8 changed files with 178 additions and 9 deletions

View File

@ -64,6 +64,9 @@ Release 2.6.0 - UNRELEASED
MAPREDUCE-6018. Added an MR specific config to enable emitting job history
data to the timeline server. (Robert Kanter via zjshen)
MAPREDUCE-6052. Supported overriding the default container-log4j.properties
file per job. (Junping Du via zjshen)
OPTIMIZATIONS
BUG FIXES

View File

@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Vector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.TaskLog.LogName;
import org.apache.hadoop.mapreduce.ID;
@ -148,11 +149,11 @@ public class MapReduceChildJVM {
private static void setupLog4jProperties(Task task,
Vector<String> vargs,
long logSize) {
long logSize, Configuration conf) {
String logLevel = getChildLogLevel(task.conf, task.isMapTask());
int numBackups = task.conf.getInt(MRJobConfig.TASK_LOG_BACKUPS,
MRJobConfig.DEFAULT_TASK_LOG_BACKUPS);
MRApps.addLog4jSystemProperties(logLevel, logSize, numBackups, vargs);
MRApps.addLog4jSystemProperties(logLevel, logSize, numBackups, vargs, conf);
}
public static List<String> getVMCommand(
@ -208,7 +209,7 @@ public class MapReduceChildJVM {
// Setup the log4j prop
long logSize = TaskLog.getTaskLogLength(conf);
setupLog4jProperties(task, vargs, logSize);
setupLog4jProperties(task, vargs, logSize, conf);
if (conf.getProfileEnabled()) {
if (conf.getProfileTaskRange(task.isMapTask()

View File

@ -77,6 +77,36 @@ public class TestMapReduceChildJVM {
app.cmdEnvironment.containsKey("HADOOP_CLIENT_OPTS"));
Assert.assertEquals("", app.cmdEnvironment.get("HADOOP_CLIENT_OPTS"));
}
@Test (timeout = 30000)
public void testCommandLineWithLog4JConifg() throws Exception {
MyMRApp app = new MyMRApp(1, 0, true, this.getClass().getName(), true);
Configuration conf = new Configuration();
conf.setBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, true);
String testLogPropertieFile = "test-log4j.properties";
String testLogPropertiePath = "../"+"test-log4j.properties";
conf.set(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE, testLogPropertiePath);
Job job = app.submit(conf);
app.waitForState(job, JobState.SUCCEEDED);
app.verifyCompleted();
Assert.assertEquals(
"[" + MRApps.crossPlatformify("JAVA_HOME") + "/bin/java" +
" -Djava.net.preferIPv4Stack=true" +
" -Dhadoop.metrics.log.level=WARN" +
" -Xmx200m -Djava.io.tmpdir=" + MRApps.crossPlatformify("PWD") + "/tmp" +
" -Dlog4j.configuration=" + testLogPropertieFile +
" -Dyarn.app.container.log.dir=<LOG_DIR>" +
" -Dyarn.app.container.log.filesize=0" +
" -Dhadoop.root.logger=INFO,CLA" +
" org.apache.hadoop.mapred.YarnChild 127.0.0.1" +
" 54321" +
" attempt_0_0000_m_000000_0" +
" 0" +
" 1><LOG_DIR>/stdout" +
" 2><LOG_DIR>/stderr ]", app.myCommandLine);
}
private static final class MyMRApp extends MRApp {

View File

@ -613,10 +613,26 @@ public class MRApps extends Apps {
* @param logSize See {@link ContainerLogAppender#setTotalLogFileSize(long)}
* @param numBackups See {@link RollingFileAppender#setMaxBackupIndex(int)}
* @param vargs the argument list to append to
* @param conf configuration of MR job
*/
public static void addLog4jSystemProperties(
String logLevel, long logSize, int numBackups, List<String> vargs) {
vargs.add("-Dlog4j.configuration=container-log4j.properties");
String logLevel, long logSize, int numBackups, List<String> vargs,
Configuration conf) {
String log4jPropertyFile =
conf.get(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE, "");
if (log4jPropertyFile.isEmpty()) {
vargs.add("-Dlog4j.configuration=container-log4j.properties");
} else {
URI log4jURI = null;
try {
log4jURI = new URI(log4jPropertyFile);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
Path log4jPath = new Path(log4jURI);
vargs.add("-Dlog4j.configuration="+log4jPath.getName());
}
vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "=" +
ApplicationConstants.LOG_DIR_EXPANSION_VAR);
vargs.add(

View File

@ -75,6 +75,14 @@ public class JobSubmissionFiles {
public static Path getJobDistCacheFiles(Path jobSubmitDir) {
return new Path(jobSubmitDir, "files");
}
/**
* Get the job distributed cache path for log4j properties.
* @param jobSubmitDir
*/
public static Path getJobLog4jFile(Path jobSubmitDir) {
return new Path(jobSubmitDir, "log4j");
}
/**
* Get the job distributed cache archives path.
* @param jobSubmitDir

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
@ -262,15 +263,102 @@ class JobSubmitter {
LOG.warn("No job jar file set. User classes may not be found. "+
"See Job or Job#setJar(String).");
}
addLog4jToDistributedCache(job, submitJobDir);
// set the timestamps of the archives and files
// set the public/private visibility of the archives and files
ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf);
// get DelegationToken for each cached file
// get DelegationToken for cached file
ClientDistributedCacheManager.getDelegationTokens(conf, job
.getCredentials());
}
// copy user specified log4j.property file in local
// to HDFS with putting on distributed cache and adding its parent directory
// to classpath.
@SuppressWarnings("deprecation")
private void copyLog4jPropertyFile(Job job, Path submitJobDir,
short replication) throws IOException {
Configuration conf = job.getConfiguration();
String file = validateFilePath(
conf.get(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE), conf);
LOG.debug("default FileSystem: " + jtFs.getUri());
FsPermission mapredSysPerms =
new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
if (!jtFs.exists(submitJobDir)) {
throw new IOException("Cannot find job submission directory! "
+ "It should just be created, so something wrong here.");
}
Path fileDir = JobSubmissionFiles.getJobLog4jFile(submitJobDir);
// first copy local log4j.properties file to HDFS under submitJobDir
if (file != null) {
FileSystem.mkdirs(jtFs, fileDir, mapredSysPerms);
URI tmpURI = null;
try {
tmpURI = new URI(file);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
Path tmp = new Path(tmpURI);
Path newPath = copyRemoteFiles(fileDir, tmp, conf, replication);
DistributedCache.addFileToClassPath(new Path(newPath.toUri().getPath()), conf);
}
}
/**
* takes input as a path string for file and verifies if it exist.
* It defaults for file:/// if the files specified do not have a scheme.
* it returns the paths uri converted defaulting to file:///.
* So an input of /home/user/file1 would return file:///home/user/file1
* @param file
* @param conf
* @return
*/
private String validateFilePath(String file, Configuration conf)
throws IOException {
if (file == null) {
return null;
}
if (file.isEmpty()) {
throw new IllegalArgumentException("File name can't be empty string");
}
String finalPath;
URI pathURI;
try {
pathURI = new URI(file);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
Path path = new Path(pathURI);
FileSystem localFs = FileSystem.getLocal(conf);
if (pathURI.getScheme() == null) {
//default to the local file system
//check if the file exists or not first
if (!localFs.exists(path)) {
throw new FileNotFoundException("File " + file + " does not exist.");
}
finalPath = path.makeQualified(localFs.getUri(),
localFs.getWorkingDirectory()).toString();
}
else {
// check if the file exists in this file system
// we need to recreate this filesystem object to copy
// these files to the file system ResourceManager is running
// on.
FileSystem fs = path.getFileSystem(conf);
if (!fs.exists(path)) {
throw new FileNotFoundException("File " + file + " does not exist.");
}
finalPath = path.makeQualified(fs.getUri(),
fs.getWorkingDirectory()).toString();
}
return finalPath;
}
private URI getPathURI(Path destPath, String fragment)
throws URISyntaxException {
URI pathURI = destPath.toUri();
@ -305,7 +393,7 @@ class JobSubmitter {
// Set the working directory
if (job.getWorkingDirectory() == null) {
job.setWorkingDirectory(jtFs.getWorkingDirectory());
job.setWorkingDirectory(jtFs.getWorkingDirectory());
}
}
@ -395,6 +483,10 @@ class JobSubmitter {
}
copyAndConfigureFiles(job, submitJobDir);
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// Create the splits for the job
@ -673,4 +765,20 @@ class JobSubmitter {
DistributedCache.addCacheArchive(uri, conf);
}
}
private void addLog4jToDistributedCache(Job job,
Path jobSubmitDir) throws IOException {
Configuration conf = job.getConfiguration();
String log4jPropertyFile =
conf.get(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE, "");
if (!log4jPropertyFile.isEmpty()) {
short replication = (short)conf.getInt(Job.SUBMIT_REPLICATION, 10);
copyLog4jPropertyFile(job, jobSubmitDir, replication);
// Set the working directory
if (job.getWorkingDirectory() == null) {
job.setWorkingDirectory(jtFs.getWorkingDirectory());
}
}
}
}

View File

@ -719,6 +719,9 @@ public interface MRJobConfig {
*/
public static final String MAPREDUCE_APPLICATION_CLASSPATH =
"mapreduce.application.classpath";
public static final String MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE =
"mapreduce.job.log4j-properties-file";
/**
* Path to MapReduce framework archive

View File

@ -398,7 +398,7 @@ public class YARNRunner implements ClientProtocol {
MRJobConfig.MR_AM_LOG_LEVEL, MRJobConfig.DEFAULT_MR_AM_LOG_LEVEL);
int numBackups = jobConf.getInt(MRJobConfig.MR_AM_LOG_BACKUPS,
MRJobConfig.DEFAULT_MR_AM_LOG_BACKUPS);
MRApps.addLog4jSystemProperties(logLevel, logSize, numBackups, vargs);
MRApps.addLog4jSystemProperties(logLevel, logSize, numBackups, vargs, conf);
// Check for Java Lib Path usage in MAP and REDUCE configs
warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS,""), "map",