MAPREDUCE-5932. Provide an option to use a dedicated reduce-side shuffle log. Contributed by Gera Shegalov
This commit is contained in:
parent
22afae890d
commit
03ab24aa01
|
@ -235,6 +235,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
|
MAPREDUCE-5932. Provide an option to use a dedicated reduce-side shuffle
|
||||||
|
log (Gera Shegalov via jlowe)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
MAPREDUCE-6169. MergeQueue should release reference to the current item
|
MAPREDUCE-6169. MergeQueue should release reference to the current item
|
||||||
|
|
|
@ -20,16 +20,14 @@ package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Vector;
|
import java.util.Vector;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.mapred.TaskLog.LogName;
|
import org.apache.hadoop.mapred.TaskLog.LogName;
|
||||||
import org.apache.hadoop.mapreduce.ID;
|
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||||
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
||||||
|
@ -52,20 +50,6 @@ public class MapReduceChildJVM {
|
||||||
jobConf.get(JobConf.MAPRED_TASK_ENV));
|
jobConf.get(JobConf.MAPRED_TASK_ENV));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String getChildLogLevel(JobConf conf, boolean isMap) {
|
|
||||||
if (isMap) {
|
|
||||||
return conf.get(
|
|
||||||
MRJobConfig.MAP_LOG_LEVEL,
|
|
||||||
JobConf.DEFAULT_LOG_LEVEL.toString()
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
return conf.get(
|
|
||||||
MRJobConfig.REDUCE_LOG_LEVEL,
|
|
||||||
JobConf.DEFAULT_LOG_LEVEL.toString()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void setVMEnv(Map<String, String> environment,
|
public static void setVMEnv(Map<String, String> environment,
|
||||||
Task task) {
|
Task task) {
|
||||||
|
|
||||||
|
@ -79,7 +63,7 @@ public class MapReduceChildJVM {
|
||||||
// streaming) it will have the correct loglevel.
|
// streaming) it will have the correct loglevel.
|
||||||
environment.put(
|
environment.put(
|
||||||
"HADOOP_ROOT_LOGGER",
|
"HADOOP_ROOT_LOGGER",
|
||||||
getChildLogLevel(conf, task.isMapTask()) + ",console");
|
MRApps.getChildLogLevel(conf, task.isMapTask()) + ",console");
|
||||||
|
|
||||||
// TODO: The following is useful for instance in streaming tasks. Should be
|
// TODO: The following is useful for instance in streaming tasks. Should be
|
||||||
// set in ApplicationMaster's env by the RM.
|
// set in ApplicationMaster's env by the RM.
|
||||||
|
@ -147,15 +131,6 @@ public class MapReduceChildJVM {
|
||||||
return adminClasspath + " " + userClasspath;
|
return adminClasspath + " " + userClasspath;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void setupLog4jProperties(Task task,
|
|
||||||
Vector<String> vargs,
|
|
||||||
long logSize, Configuration conf) {
|
|
||||||
String logLevel = getChildLogLevel(task.conf, task.isMapTask());
|
|
||||||
int numBackups = task.conf.getInt(MRJobConfig.TASK_LOG_BACKUPS,
|
|
||||||
MRJobConfig.DEFAULT_TASK_LOG_BACKUPS);
|
|
||||||
MRApps.addLog4jSystemProperties(logLevel, logSize, numBackups, vargs, conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static List<String> getVMCommand(
|
public static List<String> getVMCommand(
|
||||||
InetSocketAddress taskAttemptListenerAddr, Task task,
|
InetSocketAddress taskAttemptListenerAddr, Task task,
|
||||||
JVMId jvmID) {
|
JVMId jvmID) {
|
||||||
|
@ -206,10 +181,7 @@ public class MapReduceChildJVM {
|
||||||
Path childTmpDir = new Path(MRApps.crossPlatformifyMREnv(conf, Environment.PWD),
|
Path childTmpDir = new Path(MRApps.crossPlatformifyMREnv(conf, Environment.PWD),
|
||||||
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
|
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
|
||||||
vargs.add("-Djava.io.tmpdir=" + childTmpDir);
|
vargs.add("-Djava.io.tmpdir=" + childTmpDir);
|
||||||
|
MRApps.addLog4jSystemProperties(task, vargs, conf);
|
||||||
// Setup the log4j prop
|
|
||||||
long logSize = TaskLog.getTaskLogLength(conf);
|
|
||||||
setupLog4jProperties(task, vargs, logSize, conf);
|
|
||||||
|
|
||||||
if (conf.getProfileEnabled()) {
|
if (conf.getProfileEnabled()) {
|
||||||
if (conf.getProfileTaskRange(task.isMapTask()
|
if (conf.getProfileTaskRange(task.isMapTask()
|
||||||
|
|
|
@ -61,7 +61,7 @@ public class TestMapReduceChildJVM {
|
||||||
" -Dlog4j.configuration=container-log4j.properties" +
|
" -Dlog4j.configuration=container-log4j.properties" +
|
||||||
" -Dyarn.app.container.log.dir=<LOG_DIR>" +
|
" -Dyarn.app.container.log.dir=<LOG_DIR>" +
|
||||||
" -Dyarn.app.container.log.filesize=0" +
|
" -Dyarn.app.container.log.filesize=0" +
|
||||||
" -Dhadoop.root.logger=INFO,CLA" +
|
" -Dhadoop.root.logger=INFO,CLA -Dhadoop.root.logfile=syslog" +
|
||||||
" org.apache.hadoop.mapred.YarnChild 127.0.0.1" +
|
" org.apache.hadoop.mapred.YarnChild 127.0.0.1" +
|
||||||
" 54321" +
|
" 54321" +
|
||||||
" attempt_0_0000_m_000000_0" +
|
" attempt_0_0000_m_000000_0" +
|
||||||
|
@ -78,6 +78,73 @@ public class TestMapReduceChildJVM {
|
||||||
Assert.assertEquals("", app.cmdEnvironment.get("HADOOP_CLIENT_OPTS"));
|
Assert.assertEquals("", app.cmdEnvironment.get("HADOOP_CLIENT_OPTS"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test (timeout = 30000)
|
||||||
|
public void testReduceCommandLineWithSeparateShuffle() throws Exception {
|
||||||
|
final Configuration conf = new Configuration();
|
||||||
|
conf.setBoolean(MRJobConfig.REDUCE_SEPARATE_SHUFFLE_LOG, true);
|
||||||
|
testReduceCommandLine(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test (timeout = 30000)
|
||||||
|
public void testReduceCommandLineWithSeparateCRLAShuffle() throws Exception {
|
||||||
|
final Configuration conf = new Configuration();
|
||||||
|
conf.setBoolean(MRJobConfig.REDUCE_SEPARATE_SHUFFLE_LOG, true);
|
||||||
|
conf.setLong(MRJobConfig.SHUFFLE_LOG_KB, 1L);
|
||||||
|
conf.setInt(MRJobConfig.SHUFFLE_LOG_BACKUPS, 3);
|
||||||
|
testReduceCommandLine(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test (timeout = 30000)
|
||||||
|
public void testReduceCommandLine() throws Exception {
|
||||||
|
final Configuration conf = new Configuration();
|
||||||
|
testReduceCommandLine(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testReduceCommandLine(Configuration conf)
|
||||||
|
throws Exception {
|
||||||
|
|
||||||
|
MyMRApp app = new MyMRApp(0, 1, true, this.getClass().getName(), true);
|
||||||
|
conf.setBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, true);
|
||||||
|
Job job = app.submit(conf);
|
||||||
|
app.waitForState(job, JobState.SUCCEEDED);
|
||||||
|
app.verifyCompleted();
|
||||||
|
|
||||||
|
final long shuffleLogSize =
|
||||||
|
conf.getLong(MRJobConfig.SHUFFLE_LOG_KB, 0L) * 1024L;
|
||||||
|
final int shuffleBackups = conf.getInt(MRJobConfig.SHUFFLE_LOG_BACKUPS, 0);
|
||||||
|
final String appenderName = shuffleLogSize > 0L && shuffleBackups > 0
|
||||||
|
? "shuffleCRLA"
|
||||||
|
: "shuffleCLA";
|
||||||
|
|
||||||
|
Assert.assertEquals(
|
||||||
|
"[" + MRApps.crossPlatformify("JAVA_HOME") + "/bin/java" +
|
||||||
|
" -Djava.net.preferIPv4Stack=true" +
|
||||||
|
" -Dhadoop.metrics.log.level=WARN" +
|
||||||
|
" -Xmx200m -Djava.io.tmpdir=" + MRApps.crossPlatformify("PWD") + "/tmp" +
|
||||||
|
" -Dlog4j.configuration=container-log4j.properties" +
|
||||||
|
" -Dyarn.app.container.log.dir=<LOG_DIR>" +
|
||||||
|
" -Dyarn.app.container.log.filesize=0" +
|
||||||
|
" -Dhadoop.root.logger=INFO,CLA -Dhadoop.root.logfile=syslog" +
|
||||||
|
" -Dyarn.app.mapreduce.shuffle.logger=INFO," + appenderName +
|
||||||
|
" -Dyarn.app.mapreduce.shuffle.logfile=syslog.shuffle" +
|
||||||
|
" -Dyarn.app.mapreduce.shuffle.log.filesize=" + shuffleLogSize +
|
||||||
|
" -Dyarn.app.mapreduce.shuffle.log.backups=" + shuffleBackups +
|
||||||
|
" org.apache.hadoop.mapred.YarnChild 127.0.0.1" +
|
||||||
|
" 54321" +
|
||||||
|
" attempt_0_0000_r_000000_0" +
|
||||||
|
" 0" +
|
||||||
|
" 1><LOG_DIR>/stdout" +
|
||||||
|
" 2><LOG_DIR>/stderr ]", app.myCommandLine);
|
||||||
|
|
||||||
|
Assert.assertTrue("HADOOP_ROOT_LOGGER not set for job",
|
||||||
|
app.cmdEnvironment.containsKey("HADOOP_ROOT_LOGGER"));
|
||||||
|
Assert.assertEquals("INFO,console",
|
||||||
|
app.cmdEnvironment.get("HADOOP_ROOT_LOGGER"));
|
||||||
|
Assert.assertTrue("HADOOP_CLIENT_OPTS not set for job",
|
||||||
|
app.cmdEnvironment.containsKey("HADOOP_CLIENT_OPTS"));
|
||||||
|
Assert.assertEquals("", app.cmdEnvironment.get("HADOOP_CLIENT_OPTS"));
|
||||||
|
}
|
||||||
|
|
||||||
@Test (timeout = 30000)
|
@Test (timeout = 30000)
|
||||||
public void testCommandLineWithLog4JConifg() throws Exception {
|
public void testCommandLineWithLog4JConifg() throws Exception {
|
||||||
|
|
||||||
|
@ -99,7 +166,7 @@ public class TestMapReduceChildJVM {
|
||||||
" -Dlog4j.configuration=" + testLogPropertieFile +
|
" -Dlog4j.configuration=" + testLogPropertieFile +
|
||||||
" -Dyarn.app.container.log.dir=<LOG_DIR>" +
|
" -Dyarn.app.container.log.dir=<LOG_DIR>" +
|
||||||
" -Dyarn.app.container.log.filesize=0" +
|
" -Dyarn.app.container.log.filesize=0" +
|
||||||
" -Dhadoop.root.logger=INFO,CLA" +
|
" -Dhadoop.root.logger=INFO,CLA -Dhadoop.root.logfile=syslog" +
|
||||||
" org.apache.hadoop.mapred.YarnChild 127.0.0.1" +
|
" org.apache.hadoop.mapred.YarnChild 127.0.0.1" +
|
||||||
" 54321" +
|
" 54321" +
|
||||||
" attempt_0_0000_m_000000_0" +
|
" attempt_0_0000_m_000000_0" +
|
||||||
|
|
|
@ -43,6 +43,9 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.mapred.InvalidJobConfException;
|
import org.apache.hadoop.mapred.InvalidJobConfException;
|
||||||
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
|
import org.apache.hadoop.mapred.Task;
|
||||||
|
import org.apache.hadoop.mapred.TaskLog;
|
||||||
import org.apache.hadoop.mapreduce.JobID;
|
import org.apache.hadoop.mapreduce.JobID;
|
||||||
import org.apache.hadoop.mapreduce.MRConfig;
|
import org.apache.hadoop.mapreduce.MRConfig;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
|
@ -59,6 +62,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
||||||
import org.apache.hadoop.util.ApplicationClassLoader;
|
import org.apache.hadoop.util.ApplicationClassLoader;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.ContainerLogAppender;
|
import org.apache.hadoop.yarn.ContainerLogAppender;
|
||||||
|
import org.apache.hadoop.yarn.ContainerRollingLogAppender;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||||
|
@ -68,7 +72,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.util.Apps;
|
import org.apache.hadoop.yarn.util.Apps;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
import org.apache.log4j.RollingFileAppender;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper class for MR applications
|
* Helper class for MR applications
|
||||||
|
@ -593,17 +596,31 @@ public class MRApps extends Apps {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static String getChildLogLevel(Configuration conf, boolean isMap) {
|
||||||
|
if (isMap) {
|
||||||
|
return conf.get(
|
||||||
|
MRJobConfig.MAP_LOG_LEVEL,
|
||||||
|
JobConf.DEFAULT_LOG_LEVEL.toString()
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
return conf.get(
|
||||||
|
MRJobConfig.REDUCE_LOG_LEVEL,
|
||||||
|
JobConf.DEFAULT_LOG_LEVEL.toString()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add the JVM system properties necessary to configure {@link ContainerLogAppender}.
|
* Add the JVM system properties necessary to configure
|
||||||
* @param logLevel the desired log level (eg INFO/WARN/DEBUG)
|
* {@link ContainerLogAppender} or
|
||||||
* @param logSize See {@link ContainerLogAppender#setTotalLogFileSize(long)}
|
* {@link ContainerRollingLogAppender}.
|
||||||
* @param numBackups See {@link RollingFileAppender#setMaxBackupIndex(int)}
|
*
|
||||||
|
* @param task for map/reduce, or null for app master
|
||||||
* @param vargs the argument list to append to
|
* @param vargs the argument list to append to
|
||||||
* @param conf configuration of MR job
|
* @param conf configuration of MR job
|
||||||
*/
|
*/
|
||||||
public static void addLog4jSystemProperties(
|
public static void addLog4jSystemProperties(Task task,
|
||||||
String logLevel, long logSize, int numBackups, List<String> vargs,
|
List<String> vargs, Configuration conf) {
|
||||||
Configuration conf) {
|
|
||||||
String log4jPropertyFile =
|
String log4jPropertyFile =
|
||||||
conf.get(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE, "");
|
conf.get(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE, "");
|
||||||
if (log4jPropertyFile.isEmpty()) {
|
if (log4jPropertyFile.isEmpty()) {
|
||||||
|
@ -619,10 +636,29 @@ public class MRApps extends Apps {
|
||||||
vargs.add("-Dlog4j.configuration="+log4jPath.getName());
|
vargs.add("-Dlog4j.configuration="+log4jPath.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long logSize;
|
||||||
|
String logLevel;
|
||||||
|
int numBackups;
|
||||||
|
|
||||||
|
if (task == null) {
|
||||||
|
logSize = conf.getLong(MRJobConfig.MR_AM_LOG_KB,
|
||||||
|
MRJobConfig.DEFAULT_MR_AM_LOG_KB) << 10;
|
||||||
|
logLevel = conf.get(
|
||||||
|
MRJobConfig.MR_AM_LOG_LEVEL, MRJobConfig.DEFAULT_MR_AM_LOG_LEVEL);
|
||||||
|
numBackups = conf.getInt(MRJobConfig.MR_AM_LOG_BACKUPS,
|
||||||
|
MRJobConfig.DEFAULT_MR_AM_LOG_BACKUPS);
|
||||||
|
} else {
|
||||||
|
logSize = TaskLog.getTaskLogLimitBytes(conf);
|
||||||
|
logLevel = getChildLogLevel(conf, task.isMapTask());
|
||||||
|
numBackups = conf.getInt(MRJobConfig.TASK_LOG_BACKUPS,
|
||||||
|
MRJobConfig.DEFAULT_TASK_LOG_BACKUPS);
|
||||||
|
}
|
||||||
|
|
||||||
vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "=" +
|
vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "=" +
|
||||||
ApplicationConstants.LOG_DIR_EXPANSION_VAR);
|
ApplicationConstants.LOG_DIR_EXPANSION_VAR);
|
||||||
vargs.add(
|
vargs.add(
|
||||||
"-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_SIZE + "=" + logSize);
|
"-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_SIZE + "=" + logSize);
|
||||||
|
|
||||||
if (logSize > 0L && numBackups > 0) {
|
if (logSize > 0L && numBackups > 0) {
|
||||||
// log should be rolled
|
// log should be rolled
|
||||||
vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_BACKUPS + "="
|
vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_BACKUPS + "="
|
||||||
|
@ -631,6 +667,30 @@ public class MRApps extends Apps {
|
||||||
} else {
|
} else {
|
||||||
vargs.add("-Dhadoop.root.logger=" + logLevel + ",CLA");
|
vargs.add("-Dhadoop.root.logger=" + logLevel + ",CLA");
|
||||||
}
|
}
|
||||||
|
vargs.add("-Dhadoop.root.logfile=" + TaskLog.LogName.SYSLOG);
|
||||||
|
|
||||||
|
if ( task != null
|
||||||
|
&& !task.isMapTask()
|
||||||
|
&& conf.getBoolean(MRJobConfig.REDUCE_SEPARATE_SHUFFLE_LOG,
|
||||||
|
MRJobConfig.DEFAULT_REDUCE_SEPARATE_SHUFFLE_LOG)) {
|
||||||
|
final int numShuffleBackups = conf.getInt(MRJobConfig.SHUFFLE_LOG_BACKUPS,
|
||||||
|
MRJobConfig.DEFAULT_SHUFFLE_LOG_BACKUPS);
|
||||||
|
final long shuffleLogSize = conf.getLong(MRJobConfig.SHUFFLE_LOG_KB,
|
||||||
|
MRJobConfig.DEFAULT_SHUFFLE_LOG_KB) << 10;
|
||||||
|
final String shuffleLogger = logLevel
|
||||||
|
+ (shuffleLogSize > 0L && numShuffleBackups > 0
|
||||||
|
? ",shuffleCRLA"
|
||||||
|
: ",shuffleCLA");
|
||||||
|
|
||||||
|
vargs.add("-D" + MRJobConfig.MR_PREFIX
|
||||||
|
+ "shuffle.logger=" + shuffleLogger);
|
||||||
|
vargs.add("-D" + MRJobConfig.MR_PREFIX
|
||||||
|
+ "shuffle.logfile=" + TaskLog.LogName.SYSLOG + ".shuffle");
|
||||||
|
vargs.add("-D" + MRJobConfig.MR_PREFIX
|
||||||
|
+ "shuffle.log.filesize=" + shuffleLogSize);
|
||||||
|
vargs.add("-D" + MRJobConfig.MR_PREFIX
|
||||||
|
+ "shuffle.log.backups=" + numShuffleBackups);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -27,7 +27,6 @@ import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||||
import org.apache.hadoop.mapred.FileAlreadyExistsException;
|
|
||||||
import org.apache.hadoop.mapreduce.security.TokenCache;
|
import org.apache.hadoop.mapreduce.security.TokenCache;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
|
|
||||||
|
@ -287,7 +286,8 @@ public abstract class FileOutputFormat<K, V> implements OutputFormat<K, V> {
|
||||||
"This method can only be called from within a Job");
|
"This method can only be called from within a Job");
|
||||||
}
|
}
|
||||||
|
|
||||||
String taskType = (conf.getBoolean(JobContext.TASK_ISMAP, true)) ? "m" : "r";
|
String taskType = conf.getBoolean(JobContext.TASK_ISMAP,
|
||||||
|
JobContext.DEFAULT_TASK_ISMAP) ? "m" : "r";
|
||||||
|
|
||||||
NumberFormat numberFormat = NumberFormat.getInstance();
|
NumberFormat numberFormat = NumberFormat.getInstance();
|
||||||
numberFormat.setMinimumIntegerDigits(5);
|
numberFormat.setMinimumIntegerDigits(5);
|
||||||
|
|
|
@ -473,6 +473,10 @@ public class TaskLog {
|
||||||
* @return the number of bytes to cap the log files at
|
* @return the number of bytes to cap the log files at
|
||||||
*/
|
*/
|
||||||
public static long getTaskLogLength(JobConf conf) {
|
public static long getTaskLogLength(JobConf conf) {
|
||||||
|
return getTaskLogLimitBytes(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static long getTaskLogLimitBytes(Configuration conf) {
|
||||||
return conf.getLong(JobContext.TASK_USERLOG_LIMIT, 0) * 1024;
|
return conf.getLong(JobContext.TASK_USERLOG_LIMIT, 0) * 1024;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -177,6 +177,7 @@ public interface MRJobConfig {
|
||||||
public static final String TASK_ATTEMPT_ID = "mapreduce.task.attempt.id";
|
public static final String TASK_ATTEMPT_ID = "mapreduce.task.attempt.id";
|
||||||
|
|
||||||
public static final String TASK_ISMAP = "mapreduce.task.ismap";
|
public static final String TASK_ISMAP = "mapreduce.task.ismap";
|
||||||
|
public static final boolean DEFAULT_TASK_ISMAP = true;
|
||||||
|
|
||||||
public static final String TASK_PARTITION = "mapreduce.task.partition";
|
public static final String TASK_PARTITION = "mapreduce.task.partition";
|
||||||
|
|
||||||
|
@ -773,6 +774,18 @@ public interface MRJobConfig {
|
||||||
MR_PREFIX + "task.container.log.backups";
|
MR_PREFIX + "task.container.log.backups";
|
||||||
public static final int DEFAULT_TASK_LOG_BACKUPS = 0; // don't roll
|
public static final int DEFAULT_TASK_LOG_BACKUPS = 0; // don't roll
|
||||||
|
|
||||||
|
public static final String REDUCE_SEPARATE_SHUFFLE_LOG =
|
||||||
|
MR_PREFIX + "shuffle.log.separate";
|
||||||
|
public static final boolean DEFAULT_REDUCE_SEPARATE_SHUFFLE_LOG = true;
|
||||||
|
|
||||||
|
public static final String SHUFFLE_LOG_BACKUPS =
|
||||||
|
MR_PREFIX + "shuffle.log.backups";
|
||||||
|
public static final int DEFAULT_SHUFFLE_LOG_BACKUPS = 0; // don't roll
|
||||||
|
|
||||||
|
public static final String SHUFFLE_LOG_KB =
|
||||||
|
MR_PREFIX + "shuffle.log.limit.kb";
|
||||||
|
public static final long DEFAULT_SHUFFLE_LOG_KB = 0L;
|
||||||
|
|
||||||
public static final String WORKFLOW_NAME = "mapreduce.workflow.name";
|
public static final String WORKFLOW_NAME = "mapreduce.workflow.name";
|
||||||
|
|
||||||
public static final String WORKFLOW_NODE_NAME =
|
public static final String WORKFLOW_NODE_NAME =
|
||||||
|
@ -812,4 +825,5 @@ public interface MRJobConfig {
|
||||||
"mapreduce.job.encrypted-intermediate-data.buffer.kb";
|
"mapreduce.job.encrypted-intermediate-data.buffer.kb";
|
||||||
public static final int DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB =
|
public static final int DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB =
|
||||||
128;
|
128;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -713,6 +713,34 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>yarn.app.mapreduce.shuffle.log.separate</name>
|
||||||
|
<value>true</value>
|
||||||
|
<description>If enabled ('true') logging generated by the client-side shuffle
|
||||||
|
classes in a reducer will be written in a dedicated log file
|
||||||
|
'syslog.shuffle' instead of 'syslog'.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>yarn.app.mapreduce.shuffle.log.limit.kb</name>
|
||||||
|
<value>0</value>
|
||||||
|
<description>Maximum size of the syslog.shuffle file in kilobytes
|
||||||
|
(0 for no limit).
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>yarn.app.mapreduce.shuffle.log.backups</name>
|
||||||
|
<value>0</value>
|
||||||
|
<description>If yarn.app.mapreduce.shuffle.log.limit.kb and
|
||||||
|
yarn.app.mapreduce.shuffle.log.backups are greater than zero
|
||||||
|
then a ContainerRollngLogAppender is used instead of ContainerLogAppender
|
||||||
|
for syslog.shuffle. See
|
||||||
|
org.apache.log4j.RollingFileAppender.maxBackupIndex
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>mapreduce.job.maxtaskfailures.per.tracker</name>
|
<name>mapreduce.job.maxtaskfailures.per.tracker</name>
|
||||||
<value>3</value>
|
<value>3</value>
|
||||||
|
|
|
@ -392,14 +392,7 @@ public class YARNRunner implements ClientProtocol {
|
||||||
vargs.add(MRApps.crossPlatformifyMREnv(jobConf, Environment.JAVA_HOME)
|
vargs.add(MRApps.crossPlatformifyMREnv(jobConf, Environment.JAVA_HOME)
|
||||||
+ "/bin/java");
|
+ "/bin/java");
|
||||||
|
|
||||||
// TODO: why do we use 'conf' some places and 'jobConf' others?
|
MRApps.addLog4jSystemProperties(null, vargs, conf);
|
||||||
long logSize = jobConf.getLong(MRJobConfig.MR_AM_LOG_KB,
|
|
||||||
MRJobConfig.DEFAULT_MR_AM_LOG_KB) << 10;
|
|
||||||
String logLevel = jobConf.get(
|
|
||||||
MRJobConfig.MR_AM_LOG_LEVEL, MRJobConfig.DEFAULT_MR_AM_LOG_LEVEL);
|
|
||||||
int numBackups = jobConf.getInt(MRJobConfig.MR_AM_LOG_BACKUPS,
|
|
||||||
MRJobConfig.DEFAULT_MR_AM_LOG_BACKUPS);
|
|
||||||
MRApps.addLog4jSystemProperties(logLevel, logSize, numBackups, vargs, conf);
|
|
||||||
|
|
||||||
// Check for Java Lib Path usage in MAP and REDUCE configs
|
// Check for Java Lib Path usage in MAP and REDUCE configs
|
||||||
warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS,""), "map",
|
warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS,""), "map",
|
||||||
|
|
|
@ -38,6 +38,7 @@ public class ContainerLogAppender extends FileAppender
|
||||||
implements Flushable
|
implements Flushable
|
||||||
{
|
{
|
||||||
private String containerLogDir;
|
private String containerLogDir;
|
||||||
|
private String containerLogFile;
|
||||||
//so that log4j can configure it from the configuration(log4j.properties).
|
//so that log4j can configure it from the configuration(log4j.properties).
|
||||||
private int maxEvents;
|
private int maxEvents;
|
||||||
private Queue<LoggingEvent> tail = null;
|
private Queue<LoggingEvent> tail = null;
|
||||||
|
@ -49,7 +50,7 @@ public class ContainerLogAppender extends FileAppender
|
||||||
if (maxEvents > 0) {
|
if (maxEvents > 0) {
|
||||||
tail = new LinkedList<LoggingEvent>();
|
tail = new LinkedList<LoggingEvent>();
|
||||||
}
|
}
|
||||||
setFile(new File(this.containerLogDir, "syslog").toString());
|
setFile(new File(this.containerLogDir, containerLogFile).toString());
|
||||||
setAppend(true);
|
setAppend(true);
|
||||||
super.activateOptions();
|
super.activateOptions();
|
||||||
}
|
}
|
||||||
|
@ -102,6 +103,14 @@ public class ContainerLogAppender extends FileAppender
|
||||||
this.containerLogDir = containerLogDir;
|
this.containerLogDir = containerLogDir;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getContainerLogFile() {
|
||||||
|
return containerLogFile;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setContainerLogFile(String containerLogFile) {
|
||||||
|
this.containerLogFile = containerLogFile;
|
||||||
|
}
|
||||||
|
|
||||||
private static final int EVENT_SIZE = 100;
|
private static final int EVENT_SIZE = 100;
|
||||||
|
|
||||||
public long getTotalLogFileSize() {
|
public long getTotalLogFileSize() {
|
||||||
|
|
|
@ -34,11 +34,12 @@ import java.io.Flushable;
|
||||||
public class ContainerRollingLogAppender extends RollingFileAppender
|
public class ContainerRollingLogAppender extends RollingFileAppender
|
||||||
implements Flushable {
|
implements Flushable {
|
||||||
private String containerLogDir;
|
private String containerLogDir;
|
||||||
|
private String containerLogFile;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void activateOptions() {
|
public void activateOptions() {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
setFile(new File(this.containerLogDir, "syslog").toString());
|
setFile(new File(this.containerLogDir, containerLogFile).toString());
|
||||||
setAppend(true);
|
setAppend(true);
|
||||||
super.activateOptions();
|
super.activateOptions();
|
||||||
}
|
}
|
||||||
|
@ -62,4 +63,12 @@ public class ContainerRollingLogAppender extends RollingFileAppender
|
||||||
public void setContainerLogDir(String containerLogDir) {
|
public void setContainerLogDir(String containerLogDir) {
|
||||||
this.containerLogDir = containerLogDir;
|
this.containerLogDir = containerLogDir;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getContainerLogFile() {
|
||||||
|
return containerLogFile;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setContainerLogFile(String containerLogFile) {
|
||||||
|
this.containerLogFile = containerLogFile;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,6 +30,7 @@ public class TestContainerLogAppender {
|
||||||
claAppender.setName("testCLA");
|
claAppender.setName("testCLA");
|
||||||
claAppender.setLayout(new PatternLayout("%-5p [%t]: %m%n"));
|
claAppender.setLayout(new PatternLayout("%-5p [%t]: %m%n"));
|
||||||
claAppender.setContainerLogDir("target/testAppendInClose/logDir");
|
claAppender.setContainerLogDir("target/testAppendInClose/logDir");
|
||||||
|
claAppender.setContainerLogFile("syslog");
|
||||||
claAppender.setTotalLogFileSize(1000);
|
claAppender.setTotalLogFileSize(1000);
|
||||||
claAppender.activateOptions();
|
claAppender.activateOptions();
|
||||||
final Logger claLog = Logger.getLogger("testAppendInClose-catergory");
|
final Logger claLog = Logger.getLogger("testAppendInClose-catergory");
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
#
|
#
|
||||||
# Define some default values that can be overridden by system properties
|
# Define some default values that can be overridden by system properties
|
||||||
hadoop.root.logger=DEBUG,CLA
|
hadoop.root.logger=DEBUG,CLA
|
||||||
|
yarn.app.mapreduce.shuffle.logger=${hadoop.root.logger}
|
||||||
|
|
||||||
# Define the root logger to the system property "hadoop.root.logger".
|
# Define the root logger to the system property "hadoop.root.logger".
|
||||||
log4j.rootLogger=${hadoop.root.logger}, EventCounter
|
log4j.rootLogger=${hadoop.root.logger}, EventCounter
|
||||||
|
@ -30,18 +31,44 @@ yarn.app.container.log.filesize=100
|
||||||
|
|
||||||
log4j.appender.CLA=org.apache.hadoop.yarn.ContainerLogAppender
|
log4j.appender.CLA=org.apache.hadoop.yarn.ContainerLogAppender
|
||||||
log4j.appender.CLA.containerLogDir=${yarn.app.container.log.dir}
|
log4j.appender.CLA.containerLogDir=${yarn.app.container.log.dir}
|
||||||
|
log4j.appender.CLA.containerLogFile=${hadoop.root.logfile}
|
||||||
log4j.appender.CLA.totalLogFileSize=${yarn.app.container.log.filesize}
|
log4j.appender.CLA.totalLogFileSize=${yarn.app.container.log.filesize}
|
||||||
|
|
||||||
log4j.appender.CLA.layout=org.apache.log4j.PatternLayout
|
log4j.appender.CLA.layout=org.apache.log4j.PatternLayout
|
||||||
log4j.appender.CLA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c: %m%n
|
log4j.appender.CLA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c: %m%n
|
||||||
|
|
||||||
log4j.appender.CRLA=org.apache.hadoop.yarn.ContainerRollingLogAppender
|
log4j.appender.CRLA=org.apache.hadoop.yarn.ContainerRollingLogAppender
|
||||||
log4j.appender.CRLA.containerLogDir=${yarn.app.container.log.dir}
|
log4j.appender.CRLA.containerLogDir=${yarn.app.container.log.dir}
|
||||||
|
log4j.appender.CRLA.containerLogFile=${hadoop.root.logfile}
|
||||||
log4j.appender.CRLA.maximumFileSize=${yarn.app.container.log.filesize}
|
log4j.appender.CRLA.maximumFileSize=${yarn.app.container.log.filesize}
|
||||||
log4j.appender.CRLA.maxBackupIndex=${yarn.app.container.log.backups}
|
log4j.appender.CRLA.maxBackupIndex=${yarn.app.container.log.backups}
|
||||||
log4j.appender.CRLA.layout=org.apache.log4j.PatternLayout
|
log4j.appender.CRLA.layout=org.apache.log4j.PatternLayout
|
||||||
log4j.appender.CRLA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c: %m%n
|
log4j.appender.CRLA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c: %m%n
|
||||||
|
|
||||||
|
log4j.appender.shuffleCLA=org.apache.hadoop.yarn.ContainerLogAppender
|
||||||
|
log4j.appender.shuffleCLA.containerLogDir=${yarn.app.container.log.dir}
|
||||||
|
log4j.appender.shuffleCLA.containerLogFile=${yarn.app.mapreduce.shuffle.logfile}
|
||||||
|
log4j.appender.shuffleCLA.totalLogFileSize=${yarn.app.mapreduce.shuffle.log.filesize}
|
||||||
|
log4j.appender.shuffleCLA.layout=org.apache.log4j.PatternLayout
|
||||||
|
log4j.appender.shuffleCLA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c: %m%n
|
||||||
|
|
||||||
|
log4j.appender.shuffleCRLA=org.apache.hadoop.yarn.ContainerRollingLogAppender
|
||||||
|
log4j.appender.shuffleCRLA.containerLogDir=${yarn.app.container.log.dir}
|
||||||
|
log4j.appender.shuffleCRLA.containerLogFile=${yarn.app.mapreduce.shuffle.logfile}
|
||||||
|
log4j.appender.shuffleCRLA.maximumFileSize=${yarn.app.mapreduce.shuffle.log.filesize}
|
||||||
|
log4j.appender.shuffleCRLA.maxBackupIndex=${yarn.app.mapreduce.shuffle.log.backups}
|
||||||
|
log4j.appender.shuffleCRLA.layout=org.apache.log4j.PatternLayout
|
||||||
|
log4j.appender.shuffleCRLA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c: %m%n
|
||||||
|
|
||||||
|
################################################################################
|
||||||
|
# Shuffle Logger
|
||||||
|
#
|
||||||
|
log4j.logger.org.apache.hadoop.mapreduce.task.reduce=${yarn.app.mapreduce.shuffle.logger}
|
||||||
|
log4j.additivity.org.apache.hadoop.mapreduce.task.reduce=false
|
||||||
|
# Merger is used for both map-side and reduce-side spill merging. On the map
|
||||||
|
# side yarn.app.mapreduce.shuffle.logger == hadoop.root.logger
|
||||||
|
#
|
||||||
|
log4j.logger.org.apache.hadoop.mapred.Merger=${yarn.app.mapreduce.shuffle.logger}
|
||||||
|
log4j.additivity.org.apache.hadoop.mapred.Merger=false
|
||||||
#
|
#
|
||||||
# Event Counter Appender
|
# Event Counter Appender
|
||||||
# Sends counts of logging messages at different severity levels to Hadoop Metrics.
|
# Sends counts of logging messages at different severity levels to Hadoop Metrics.
|
||||||
|
|
Loading…
Reference in New Issue