diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index b04dac5f0a1..27e52544c67 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; +import org.apache.hadoop.mapreduce.util.MRJobConfUtil; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.app.AppContext; @@ -58,6 +59,7 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.StringInterner; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -94,6 +96,12 @@ public class TaskAttemptListenerImpl extends CompositeService AtomicReference> attemptIdToStatus = new ConcurrentHashMap<>(); + /** + * A Map to keep track of the history of logging each task attempt. + */ + private ConcurrentHashMap + taskAttemptLogProgressStamps = new ConcurrentHashMap<>(); + private Set launchedJVMs = Collections .newSetFromMap(new ConcurrentHashMap()); @@ -123,10 +131,12 @@ public class TaskAttemptListenerImpl extends CompositeService @Override protected void serviceInit(Configuration conf) throws Exception { - registerHeartbeatHandler(conf); - commitWindowMs = conf.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS, - MRJobConfig.DEFAULT_MR_AM_COMMIT_WINDOW_MS); - super.serviceInit(conf); + registerHeartbeatHandler(conf); + commitWindowMs = conf.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS, + MRJobConfig.DEFAULT_MR_AM_COMMIT_WINDOW_MS); + // initialize the delta threshold for logging the task progress. + MRJobConfUtil.setTaskLogProgressDeltaThresholds(conf); + super.serviceInit(conf); } @Override @@ -410,8 +420,10 @@ public class TaskAttemptListenerImpl extends CompositeService taskAttemptStatus.id = yarnAttemptID; // Task sends the updated progress to the TT. taskAttemptStatus.progress = taskStatus.getProgress(); - LOG.info("Progress of TaskAttempt " + taskAttemptID + " is : " - + taskStatus.getProgress()); + // log the new progress + taskAttemptLogProgressStamps.computeIfAbsent(taskAttemptID, + k -> new TaskProgressLogPair(taskAttemptID)) + .update(taskStatus.getProgress()); // Task sends the updated state-string to the TT. taskAttemptStatus.stateString = taskStatus.getStateString(); // Task sends the updated phase to the TT. @@ -637,4 +649,68 @@ public class TaskAttemptListenerImpl extends CompositeService AtomicReference> getAttemptIdToStatus() { return attemptIdToStatus; } + + /** + * Entity to keep track of the taskAttempt, last time it was logged, + * and the + * progress that has been logged. + */ + class TaskProgressLogPair { + + /** + * The taskAttemptId of that history record. + */ + private final TaskAttemptID taskAttemptID; + /** + * Timestamp of last time the progress was logged. + */ + private volatile long logTimeStamp; + /** + * Snapshot of the last logged progress. + */ + private volatile double prevProgress; + + TaskProgressLogPair(final TaskAttemptID attemptID) { + taskAttemptID = attemptID; + prevProgress = 0.0; + logTimeStamp = 0; + } + + private void resetLog(final boolean doLog, + final float progress, final double processedProgress, + final long timestamp) { + if (doLog) { + prevProgress = processedProgress; + logTimeStamp = timestamp; + LOG.info("Progress of TaskAttempt " + taskAttemptID + " is : " + + progress); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Progress of TaskAttempt " + taskAttemptID + " is : " + + progress); + } + } + } + + public void update(final float progress) { + final double processedProgress = + MRJobConfUtil.convertTaskProgressToFactor(progress); + final double diffProgress = processedProgress - prevProgress; + final long currentTime = Time.monotonicNow(); + boolean result = + (Double.compare(diffProgress, + MRJobConfUtil.getTaskProgressMinDeltaThreshold()) >= 0); + if (!result) { + // check if time has expired. + result = ((currentTime - logTimeStamp) + >= MRJobConfUtil.getTaskProgressWaitDeltaTimeThreshold()); + } + // It is helpful to log the progress when it reaches 1.0F. + if (Float.compare(progress, 1.0f) == 0) { + result = true; + taskAttemptLogProgressStamps.remove(taskAttemptID); + } + resetLog(result, progress, processedProgress, currentTime); + } + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java index b3cefc61fc1..49b986e2259 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java @@ -51,7 +51,7 @@ public class TestTaskAttemptFinishingMonitor { Configuration conf = new Configuration(); conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT, 100); conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS, 10); - + conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD, 0.01); AppContext appCtx = mock(AppContext.class); JobTokenSecretManager secret = mock(JobTokenSecretManager.class); RMHeartbeatHandler rmHeartbeatHandler = diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java index 068ebfa2537..6b59784dbdb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java @@ -511,6 +511,8 @@ public class TestTaskAttemptListenerImpl { Configuration conf = new Configuration(); conf.setLong(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1); + conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD, 0.01); + conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS, 1); tal.init(conf); tal.start(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java index a2f0abaaa3d..44a4760eb68 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java @@ -271,6 +271,7 @@ public class TestFail { protected void serviceInit(Configuration conf) throws Exception { conf.setInt(MRJobConfig.TASK_TIMEOUT, 1*1000);//reduce timeout conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1*1000); + conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD, 0.01); super.serviceInit(conf); } }; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java index 5d86479ef87..ca03958ba06 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java @@ -58,6 +58,7 @@ public class TestTaskHeartbeatHandler { // so that TASK_TIMEOUT is not overridden conf.setLong(MRJobConfig.TASK_PROGRESS_REPORT_INTERVAL, 5); conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 10); //10 ms + conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD, 0.01); hb.init(conf); hb.start(); @@ -117,6 +118,7 @@ public class TestTaskHeartbeatHandler { new TaskHeartbeatHandler(mockHandler, clock, 1); Configuration conf = new Configuration(); conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1); + conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD, 0.01); hb.init(conf); hb.start(); try { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 4142c0175e6..71b1ef2f505 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -363,6 +363,29 @@ public interface MRJobConfig { public static final int TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS_DEFAULT = 20 * 1000; + /** + * TaskAttemptListenerImpl will log the task progress when the delta progress + * is larger than or equal the defined value. + * The double value has to be between 0, and 1 with two decimals. + */ + String TASK_LOG_PROGRESS_DELTA_THRESHOLD = + "mapreduce.task.log.progress.delta.threshold"; + /** + * Default delta progress is set to 5%. + */ + double TASK_LOG_PROGRESS_DELTA_THRESHOLD_DEFAULT = 0.05; + /** + * TaskAttemptListenerImpl will log the task progress when the defined value + * in seconds expires. + * This helps to debug task attempts that are doing very slow progress. + */ + String TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS = + "mapreduce.task.log.progress.wait.interval-seconds"; + /** + * Default period to log the task attempt progress is 60 seconds. + */ + long TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS_DEFAULT = 60L; + public static final String TASK_ID = "mapreduce.task.id"; public static final String TASK_OUTPUT_DIR = "mapreduce.task.output.dir"; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java index afedef3785f..4e4e78e1e3c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.mapreduce.util; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -58,4 +59,75 @@ public final class MRJobConfUtil { } public static final float TASK_REPORT_INTERVAL_TO_TIMEOUT_RATIO = 0.01f; + + /** + * Configurations to control the frequency of logging of task Attempt. + */ + public static final double PROGRESS_MIN_DELTA_FACTOR = 100.0; + private static volatile Double progressMinDeltaThreshold = null; + private static volatile Long progressMaxWaitDeltaTimeThreshold = null; + + /** + * load the values defined from a configuration file including the delta + * progress and the maximum time between each log message. + * @param conf + */ + public static void setTaskLogProgressDeltaThresholds( + final Configuration conf) { + if (progressMinDeltaThreshold == null) { + progressMinDeltaThreshold = + new Double(PROGRESS_MIN_DELTA_FACTOR + * conf.getDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD, + MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD_DEFAULT)); + } + + if (progressMaxWaitDeltaTimeThreshold == null) { + progressMaxWaitDeltaTimeThreshold = + TimeUnit.SECONDS.toMillis(conf + .getLong( + MRJobConfig.TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS, + MRJobConfig.TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS_DEFAULT)); + } + } + + /** + * Retrieves the min delta progress required to log the task attempt current + * progress. + * @return the defined threshold in the conf. + * returns the default value if + * {@link #setTaskLogProgressDeltaThresholds} has not been called. + */ + public static double getTaskProgressMinDeltaThreshold() { + if (progressMinDeltaThreshold == null) { + return PROGRESS_MIN_DELTA_FACTOR + * MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD_DEFAULT; + } + return progressMinDeltaThreshold.doubleValue(); + } + + /** + * Retrieves the min time required to log the task attempt current + * progress. + * @return the defined threshold in the conf. + * returns the default value if + * {@link #setTaskLogProgressDeltaThresholds} has not been called. + */ + public static long getTaskProgressWaitDeltaTimeThreshold() { + if (progressMaxWaitDeltaTimeThreshold == null) { + return TimeUnit.SECONDS.toMillis( + MRJobConfig.TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS_DEFAULT); + } + return progressMaxWaitDeltaTimeThreshold.longValue(); + } + + /** + * Coverts a progress between 0.0 to 1.0 to double format used to log the + * task attempt. + * @param progress of the task which is a value between 0.0 and 1.0. + * @return the double value that is less than or equal to the argument + * multiplied by {@link #PROGRESS_MIN_DELTA_FACTOR}. + */ + public static double convertTaskProgressToFactor(final float progress) { + return Math.floor(progress * MRJobConfUtil.PROGRESS_MIN_DELTA_FACTOR); + } }