diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 104bcff9fc2..f10ba51d523 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -285,6 +285,9 @@ Release 2.4.0 - UNRELEASED MAPREDUCE-5729. mapred job -list throws NPE (kasha) + MAPREDUCE-5693. Restore MRv1 behavior for log flush (Gera Shegalov via + jlowe) + Release 2.3.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java index d8f4f76d845..29e82d71ccc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java @@ -27,6 +27,7 @@ import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ScheduledExecutorService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -61,7 +62,6 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.log4j.LogManager; /** * The main() for MapReduce task processes. @@ -123,6 +123,7 @@ public TaskUmbilicalProtocol run() throws Exception { LOG.debug("PID: " + System.getenv().get("JVM_PID")); Task task = null; UserGroupInformation childUGI = null; + ScheduledExecutorService logSyncer = null; try { int idleLoopCount = 0; @@ -161,6 +162,8 @@ public TaskUmbilicalProtocol run() throws Exception { // set job classloader if configured before invoking the task MRApps.setJobClassLoader(job); + logSyncer = TaskLog.createLogSyncer(); + // Create a final reference to the task for the doAs block final Task taskFinal = task; childUGI.doAs(new PrivilegedExceptionAction() { @@ -214,10 +217,7 @@ public Object run() throws Exception { } finally { RPC.stopProxy(umbilical); DefaultMetricsSystem.shutdown(); - // Shutting down log4j of the child-vm... - // This assumes that on return from Task.run() - // there is no more logging done. - LogManager.shutdown(); + TaskLog.syncLogsShutdown(logSyncer); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 6fafdb5aa23..2bae130a4e3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -31,6 +31,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.io.IOUtils; @@ -45,6 +46,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.LocalContainerLauncher; import org.apache.hadoop.mapred.TaskAttemptListenerImpl; +import org.apache.hadoop.mapred.TaskLog; import org.apache.hadoop.mapred.TaskUmbilicalProtocol; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; @@ -212,6 +214,7 @@ public class MRAppMaster extends CompositeService { boolean errorHappenedShutDown = false; private String shutDownMessage = null; JobStateInternal forcedState = null; + private final ScheduledExecutorService logSyncer; private long recoveredJobStartTime = 0; @@ -240,6 +243,7 @@ public MRAppMaster(ApplicationAttemptId applicationAttemptId, this.nmHttpPort = nmHttpPort; this.metrics = MRAppMetrics.create(); this.maxAppAttempts = maxAppAttempts; + logSyncer = TaskLog.createLogSyncer(); LOG.info("Created MRAppMaster for application " + applicationAttemptId); } @@ -1078,6 +1082,12 @@ protected void serviceStart() throws Exception { // All components have started, start the job. startJobs(); } + + @Override + public void stop() { + super.stop(); + TaskLog.syncLogsShutdown(logSyncer); + } private void processRecovery() { if (appAttemptID.getAttemptId() == 1) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java index d929813b370..48640ab79c4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java @@ -23,12 +23,17 @@ import java.io.DataOutputStream; import java.io.File; import java.io.FileInputStream; +import java.io.Flushable; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.Enumeration; import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -44,6 +49,8 @@ import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.util.ProcessTree; import org.apache.hadoop.util.Shell; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.log4j.Appender; import org.apache.log4j.LogManager; @@ -262,7 +269,86 @@ public synchronized static void syncLogs(String logLocation, } writeToIndexFile(logLocation, isCleanup); } - + + public static synchronized void syncLogsShutdown( + ScheduledExecutorService scheduler) + { + // flush standard streams + // + System.out.flush(); + System.err.flush(); + + if (scheduler != null) { + scheduler.shutdownNow(); + } + + // flush & close all appenders + LogManager.shutdown(); + } + + @SuppressWarnings("unchecked") + public static synchronized void syncLogs() { + // flush standard streams + // + System.out.flush(); + System.err.flush(); + + // flush flushable appenders + // + final Logger rootLogger = Logger.getRootLogger(); + flushAppenders(rootLogger); + final Enumeration allLoggers = rootLogger.getLoggerRepository(). + getCurrentLoggers(); + while (allLoggers.hasMoreElements()) { + final Logger l = allLoggers.nextElement(); + flushAppenders(l); + } + } + + @SuppressWarnings("unchecked") + private static void flushAppenders(Logger l) { + final Enumeration allAppenders = l.getAllAppenders(); + while (allAppenders.hasMoreElements()) { + final Appender a = allAppenders.nextElement(); + if (a instanceof Flushable) { + try { + ((Flushable) a).flush(); + } catch (IOException ioe) { + System.err.println(a + ": Failed to flush!" + + StringUtils.stringifyException(ioe)); + } + } + } + } + + public static ScheduledExecutorService createLogSyncer() { + final ScheduledExecutorService scheduler = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + final Thread t = Executors.defaultThreadFactory().newThread(r); + t.setDaemon(true); + t.setName("Thread for syncLogs"); + return t; + } + }); + ShutdownHookManager.get().addShutdownHook(new Runnable() { + @Override + public void run() { + TaskLog.syncLogsShutdown(scheduler); + } + }, 50); + scheduler.scheduleWithFixedDelay( + new Runnable() { + @Override + public void run() { + TaskLog.syncLogs(); + } + }, 0L, 5L, TimeUnit.SECONDS); + return scheduler; + } + /** * The filter for userlogs. */ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLogAppender.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLogAppender.java index 0b79837f62d..2162a2602ca 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLogAppender.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLogAppender.java @@ -18,6 +18,7 @@ package org.apache.hadoop.mapred; +import java.io.Flushable; import java.util.LinkedList; import java.util.Queue; @@ -31,7 +32,7 @@ * */ @InterfaceStability.Unstable -public class TaskLogAppender extends FileAppender { +public class TaskLogAppender extends FileAppender implements Flushable { private String taskId; //taskId should be managed as String rather than TaskID object //so that log4j can configure it from the configuration(log4j.properties). private Integer maxEvents; @@ -92,6 +93,7 @@ public void append(LoggingEvent event) { } } + @Override public void flush() { if (qw != null) { qw.flush(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ContainerLogAppender.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ContainerLogAppender.java index 23988b0dcd6..95876efa453 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ContainerLogAppender.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ContainerLogAppender.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn; import java.io.File; +import java.io.Flushable; import java.util.LinkedList; import java.util.Queue; @@ -33,7 +34,9 @@ */ @Public @Unstable -public class ContainerLogAppender extends FileAppender { +public class ContainerLogAppender extends FileAppender + implements Flushable +{ private String containerLogDir; //so that log4j can configure it from the configuration(log4j.properties). private int maxEvents; @@ -65,6 +68,7 @@ public void append(LoggingEvent event) { } } + @Override public void flush() { if (qw != null) { qw.flush();