svn merge -c 1560148 FIXES: MAPREDUCE-5693. Restore MRv1 behavior for log flush. Contributed by Gera Shegalov

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1560149 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2014-01-21 19:27:57 +00:00
parent e3774a41b7
commit 033d60a28f
6 changed files with 113 additions and 8 deletions

View File

@ -137,6 +137,9 @@ Release 2.4.0 - UNRELEASED
MAPREDUCE-5729. mapred job -list throws NPE (kasha)
MAPREDUCE-5693. Restore MRv1 behavior for log flush (Gera Shegalov via
jlowe)
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -27,6 +27,7 @@ import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -61,7 +62,6 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.log4j.LogManager;
/**
* The main() for MapReduce task processes.
@ -123,6 +123,7 @@ class YarnChild {
LOG.debug("PID: " + System.getenv().get("JVM_PID"));
Task task = null;
UserGroupInformation childUGI = null;
ScheduledExecutorService logSyncer = null;
try {
int idleLoopCount = 0;
@ -155,6 +156,8 @@ class YarnChild {
// set job classloader if configured before invoking the task
MRApps.setJobClassLoader(job);
logSyncer = TaskLog.createLogSyncer();
// Create a final reference to the task for the doAs block
final Task taskFinal = task;
childUGI.doAs(new PrivilegedExceptionAction<Object>() {
@ -208,10 +211,7 @@ class YarnChild {
} finally {
RPC.stopProxy(umbilical);
DefaultMetricsSystem.shutdown();
// Shutting down log4j of the child-vm...
// This assumes that on return from Task.run()
// there is no more logging done.
LogManager.shutdown();
TaskLog.syncLogsShutdown(logSyncer);
}
}

View File

@ -31,6 +31,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.IOUtils;
@ -45,6 +46,7 @@ import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LocalContainerLauncher;
import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
import org.apache.hadoop.mapred.TaskLog;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
@ -209,6 +211,7 @@ public class MRAppMaster extends CompositeService {
boolean errorHappenedShutDown = false;
private String shutDownMessage = null;
JobStateInternal forcedState = null;
private final ScheduledExecutorService logSyncer;
private long recoveredJobStartTime = 0;
@ -237,6 +240,7 @@ public class MRAppMaster extends CompositeService {
this.nmHttpPort = nmHttpPort;
this.metrics = MRAppMetrics.create();
this.maxAppAttempts = maxAppAttempts;
logSyncer = TaskLog.createLogSyncer();
LOG.info("Created MRAppMaster for application " + applicationAttemptId);
}
@ -1064,6 +1068,12 @@ public class MRAppMaster extends CompositeService {
// All components have started, start the job.
startJobs();
}
@Override
public void stop() {
super.stop();
TaskLog.syncLogsShutdown(logSyncer);
}
private void processRecovery() {
if (appAttemptID.getAttemptId() == 1) {

View File

@ -23,12 +23,17 @@ import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.Flushable;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -44,6 +49,8 @@ import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.util.ProcessTree;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.log4j.Appender;
import org.apache.log4j.LogManager;
@ -262,7 +269,86 @@ public class TaskLog {
}
writeToIndexFile(logLocation, isCleanup);
}
public static synchronized void syncLogsShutdown(
ScheduledExecutorService scheduler)
{
// flush standard streams
//
System.out.flush();
System.err.flush();
if (scheduler != null) {
scheduler.shutdownNow();
}
// flush & close all appenders
LogManager.shutdown();
}
@SuppressWarnings("unchecked")
public static synchronized void syncLogs() {
// flush standard streams
//
System.out.flush();
System.err.flush();
// flush flushable appenders
//
final Logger rootLogger = Logger.getRootLogger();
flushAppenders(rootLogger);
final Enumeration<Logger> allLoggers = rootLogger.getLoggerRepository().
getCurrentLoggers();
while (allLoggers.hasMoreElements()) {
final Logger l = allLoggers.nextElement();
flushAppenders(l);
}
}
@SuppressWarnings("unchecked")
private static void flushAppenders(Logger l) {
final Enumeration<Appender> allAppenders = l.getAllAppenders();
while (allAppenders.hasMoreElements()) {
final Appender a = allAppenders.nextElement();
if (a instanceof Flushable) {
try {
((Flushable) a).flush();
} catch (IOException ioe) {
System.err.println(a + ": Failed to flush!"
+ StringUtils.stringifyException(ioe));
}
}
}
}
public static ScheduledExecutorService createLogSyncer() {
final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
final Thread t = Executors.defaultThreadFactory().newThread(r);
t.setDaemon(true);
t.setName("Thread for syncLogs");
return t;
}
});
ShutdownHookManager.get().addShutdownHook(new Runnable() {
@Override
public void run() {
TaskLog.syncLogsShutdown(scheduler);
}
}, 50);
scheduler.scheduleWithFixedDelay(
new Runnable() {
@Override
public void run() {
TaskLog.syncLogs();
}
}, 0L, 5L, TimeUnit.SECONDS);
return scheduler;
}
/**
* The filter for userlogs.
*/

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.mapred;
import java.io.Flushable;
import java.util.LinkedList;
import java.util.Queue;
@ -31,7 +32,7 @@ import org.apache.log4j.spi.LoggingEvent;
*
*/
@InterfaceStability.Unstable
public class TaskLogAppender extends FileAppender {
public class TaskLogAppender extends FileAppender implements Flushable {
private String taskId; //taskId should be managed as String rather than TaskID object
//so that log4j can configure it from the configuration(log4j.properties).
private Integer maxEvents;
@ -92,6 +93,7 @@ public class TaskLogAppender extends FileAppender {
}
}
@Override
public void flush() {
if (qw != null) {
qw.flush();

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn;
import java.io.File;
import java.io.Flushable;
import java.util.LinkedList;
import java.util.Queue;
@ -33,7 +34,9 @@ import org.apache.log4j.spi.LoggingEvent;
*/
@Public
@Unstable
public class ContainerLogAppender extends FileAppender {
public class ContainerLogAppender extends FileAppender
implements Flushable
{
private String containerLogDir;
//so that log4j can configure it from the configuration(log4j.properties).
private int maxEvents;
@ -65,6 +68,7 @@ public class ContainerLogAppender extends FileAppender {
}
}
@Override
public void flush() {
if (qw != null) {
qw.flush();