From 033d60a28fbdc637d7cbaaa33e65f72518fecafb Mon Sep 17 00:00:00 2001 From: Jason Darrell Lowe Date: Tue, 21 Jan 2014 19:27:57 +0000 Subject: [PATCH] svn merge -c 1560148 FIXES: MAPREDUCE-5693. Restore MRv1 behavior for log flush. Contributed by Gera Shegalov git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1560149 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../org/apache/hadoop/mapred/YarnChild.java | 10 +-- .../hadoop/mapreduce/v2/app/MRAppMaster.java | 10 +++ .../org/apache/hadoop/mapred/TaskLog.java | 88 ++++++++++++++++++- .../apache/hadoop/mapred/TaskLogAppender.java | 4 +- .../hadoop/yarn/ContainerLogAppender.java | 6 +- 6 files changed, 113 insertions(+), 8 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 968e277158a..3c66dbe3146 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -137,6 +137,9 @@ Release 2.4.0 - UNRELEASED MAPREDUCE-5729. mapred job -list throws NPE (kasha) + MAPREDUCE-5693. Restore MRv1 behavior for log flush (Gera Shegalov via + jlowe) + Release 2.3.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java index 8fa487066aa..2c5ded1e7cb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java @@ -27,6 +27,7 @@ import java.net.URI; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ScheduledExecutorService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -61,7 +62,6 @@ import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.log4j.LogManager; /** * The main() for MapReduce task processes. @@ -123,6 +123,7 @@ class YarnChild { LOG.debug("PID: " + System.getenv().get("JVM_PID")); Task task = null; UserGroupInformation childUGI = null; + ScheduledExecutorService logSyncer = null; try { int idleLoopCount = 0; @@ -155,6 +156,8 @@ class YarnChild { // set job classloader if configured before invoking the task MRApps.setJobClassLoader(job); + logSyncer = TaskLog.createLogSyncer(); + // Create a final reference to the task for the doAs block final Task taskFinal = task; childUGI.doAs(new PrivilegedExceptionAction() { @@ -208,10 +211,7 @@ class YarnChild { } finally { RPC.stopProxy(umbilical); DefaultMetricsSystem.shutdown(); - // Shutting down log4j of the child-vm... - // This assumes that on return from Task.run() - // there is no more logging done. - LogManager.shutdown(); + TaskLog.syncLogsShutdown(logSyncer); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 63628e4b0aa..378cf0eaedd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.io.IOUtils; @@ -45,6 +46,7 @@ import org.apache.hadoop.mapred.FileOutputCommitter; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.LocalContainerLauncher; import org.apache.hadoop.mapred.TaskAttemptListenerImpl; +import org.apache.hadoop.mapred.TaskLog; import org.apache.hadoop.mapred.TaskUmbilicalProtocol; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; @@ -209,6 +211,7 @@ public class MRAppMaster extends CompositeService { boolean errorHappenedShutDown = false; private String shutDownMessage = null; JobStateInternal forcedState = null; + private final ScheduledExecutorService logSyncer; private long recoveredJobStartTime = 0; @@ -237,6 +240,7 @@ public class MRAppMaster extends CompositeService { this.nmHttpPort = nmHttpPort; this.metrics = MRAppMetrics.create(); this.maxAppAttempts = maxAppAttempts; + logSyncer = TaskLog.createLogSyncer(); LOG.info("Created MRAppMaster for application " + applicationAttemptId); } @@ -1064,6 +1068,12 @@ public class MRAppMaster extends CompositeService { // All components have started, start the job. startJobs(); } + + @Override + public void stop() { + super.stop(); + TaskLog.syncLogsShutdown(logSyncer); + } private void processRecovery() { if (appAttemptID.getAttemptId() == 1) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java index d929813b370..48640ab79c4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java @@ -23,12 +23,17 @@ import java.io.BufferedReader; import java.io.DataOutputStream; import java.io.File; import java.io.FileInputStream; +import java.io.Flushable; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.Enumeration; import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -44,6 +49,8 @@ import org.apache.hadoop.io.SecureIOUtils; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.util.ProcessTree; import org.apache.hadoop.util.Shell; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.log4j.Appender; import org.apache.log4j.LogManager; @@ -262,7 +269,86 @@ public class TaskLog { } writeToIndexFile(logLocation, isCleanup); } - + + public static synchronized void syncLogsShutdown( + ScheduledExecutorService scheduler) + { + // flush standard streams + // + System.out.flush(); + System.err.flush(); + + if (scheduler != null) { + scheduler.shutdownNow(); + } + + // flush & close all appenders + LogManager.shutdown(); + } + + @SuppressWarnings("unchecked") + public static synchronized void syncLogs() { + // flush standard streams + // + System.out.flush(); + System.err.flush(); + + // flush flushable appenders + // + final Logger rootLogger = Logger.getRootLogger(); + flushAppenders(rootLogger); + final Enumeration allLoggers = rootLogger.getLoggerRepository(). + getCurrentLoggers(); + while (allLoggers.hasMoreElements()) { + final Logger l = allLoggers.nextElement(); + flushAppenders(l); + } + } + + @SuppressWarnings("unchecked") + private static void flushAppenders(Logger l) { + final Enumeration allAppenders = l.getAllAppenders(); + while (allAppenders.hasMoreElements()) { + final Appender a = allAppenders.nextElement(); + if (a instanceof Flushable) { + try { + ((Flushable) a).flush(); + } catch (IOException ioe) { + System.err.println(a + ": Failed to flush!" + + StringUtils.stringifyException(ioe)); + } + } + } + } + + public static ScheduledExecutorService createLogSyncer() { + final ScheduledExecutorService scheduler = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + final Thread t = Executors.defaultThreadFactory().newThread(r); + t.setDaemon(true); + t.setName("Thread for syncLogs"); + return t; + } + }); + ShutdownHookManager.get().addShutdownHook(new Runnable() { + @Override + public void run() { + TaskLog.syncLogsShutdown(scheduler); + } + }, 50); + scheduler.scheduleWithFixedDelay( + new Runnable() { + @Override + public void run() { + TaskLog.syncLogs(); + } + }, 0L, 5L, TimeUnit.SECONDS); + return scheduler; + } + /** * The filter for userlogs. */ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLogAppender.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLogAppender.java index 0b79837f62d..2162a2602ca 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLogAppender.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLogAppender.java @@ -18,6 +18,7 @@ package org.apache.hadoop.mapred; +import java.io.Flushable; import java.util.LinkedList; import java.util.Queue; @@ -31,7 +32,7 @@ import org.apache.log4j.spi.LoggingEvent; * */ @InterfaceStability.Unstable -public class TaskLogAppender extends FileAppender { +public class TaskLogAppender extends FileAppender implements Flushable { private String taskId; //taskId should be managed as String rather than TaskID object //so that log4j can configure it from the configuration(log4j.properties). private Integer maxEvents; @@ -92,6 +93,7 @@ public class TaskLogAppender extends FileAppender { } } + @Override public void flush() { if (qw != null) { qw.flush(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ContainerLogAppender.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ContainerLogAppender.java index 23988b0dcd6..95876efa453 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ContainerLogAppender.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ContainerLogAppender.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn; import java.io.File; +import java.io.Flushable; import java.util.LinkedList; import java.util.Queue; @@ -33,7 +34,9 @@ import org.apache.log4j.spi.LoggingEvent; */ @Public @Unstable -public class ContainerLogAppender extends FileAppender { +public class ContainerLogAppender extends FileAppender + implements Flushable +{ private String containerLogDir; //so that log4j can configure it from the configuration(log4j.properties). private int maxEvents; @@ -65,6 +68,7 @@ public class ContainerLogAppender extends FileAppender { } } + @Override public void flush() { if (qw != null) { qw.flush();