MAPREDUCE-7272. TaskAttemptListenerImpl excessive log messages. Contributed by Ahmed Hussein (ahussein)
(cherry picked from commit 11d17417ce
)
This commit is contained in:
parent
e1dd78143b
commit
b397a3a875
|
@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
|
|||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
|
||||
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
||||
import org.apache.hadoop.mapreduce.util.MRJobConfUtil;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||
|
@ -58,6 +59,7 @@ import org.apache.hadoop.net.NetUtils;
|
|||
import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||
import org.apache.hadoop.service.CompositeService;
|
||||
import org.apache.hadoop.util.StringInterner;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -94,6 +96,12 @@ public class TaskAttemptListenerImpl extends CompositeService
|
|||
AtomicReference<TaskAttemptStatus>> attemptIdToStatus
|
||||
= new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* A Map to keep track of the history of logging each task attempt.
|
||||
*/
|
||||
private ConcurrentHashMap<TaskAttemptID, TaskProgressLogPair>
|
||||
taskAttemptLogProgressStamps = new ConcurrentHashMap<>();
|
||||
|
||||
private Set<WrappedJvmID> launchedJVMs = Collections
|
||||
.newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>());
|
||||
|
||||
|
@ -126,6 +134,8 @@ public class TaskAttemptListenerImpl extends CompositeService
|
|||
registerHeartbeatHandler(conf);
|
||||
commitWindowMs = conf.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS,
|
||||
MRJobConfig.DEFAULT_MR_AM_COMMIT_WINDOW_MS);
|
||||
// initialize the delta threshold for logging the task progress.
|
||||
MRJobConfUtil.setTaskLogProgressDeltaThresholds(conf);
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
|
@ -410,8 +420,10 @@ public class TaskAttemptListenerImpl extends CompositeService
|
|||
taskAttemptStatus.id = yarnAttemptID;
|
||||
// Task sends the updated progress to the TT.
|
||||
taskAttemptStatus.progress = taskStatus.getProgress();
|
||||
LOG.info("Progress of TaskAttempt " + taskAttemptID + " is : "
|
||||
+ taskStatus.getProgress());
|
||||
// log the new progress
|
||||
taskAttemptLogProgressStamps.computeIfAbsent(taskAttemptID,
|
||||
k -> new TaskProgressLogPair(taskAttemptID))
|
||||
.update(taskStatus.getProgress());
|
||||
// Task sends the updated state-string to the TT.
|
||||
taskAttemptStatus.stateString = taskStatus.getStateString();
|
||||
// Task sends the updated phase to the TT.
|
||||
|
@ -637,4 +649,68 @@ public class TaskAttemptListenerImpl extends CompositeService
|
|||
AtomicReference<TaskAttemptStatus>> getAttemptIdToStatus() {
|
||||
return attemptIdToStatus;
|
||||
}
|
||||
|
||||
/**
|
||||
* Entity to keep track of the taskAttempt, last time it was logged,
|
||||
* and the
|
||||
* progress that has been logged.
|
||||
*/
|
||||
class TaskProgressLogPair {
|
||||
|
||||
/**
|
||||
* The taskAttemptId of that history record.
|
||||
*/
|
||||
private final TaskAttemptID taskAttemptID;
|
||||
/**
|
||||
* Timestamp of last time the progress was logged.
|
||||
*/
|
||||
private volatile long logTimeStamp;
|
||||
/**
|
||||
* Snapshot of the last logged progress.
|
||||
*/
|
||||
private volatile double prevProgress;
|
||||
|
||||
TaskProgressLogPair(final TaskAttemptID attemptID) {
|
||||
taskAttemptID = attemptID;
|
||||
prevProgress = 0.0;
|
||||
logTimeStamp = 0;
|
||||
}
|
||||
|
||||
private void resetLog(final boolean doLog,
|
||||
final float progress, final double processedProgress,
|
||||
final long timestamp) {
|
||||
if (doLog) {
|
||||
prevProgress = processedProgress;
|
||||
logTimeStamp = timestamp;
|
||||
LOG.info("Progress of TaskAttempt " + taskAttemptID + " is : "
|
||||
+ progress);
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Progress of TaskAttempt " + taskAttemptID + " is : "
|
||||
+ progress);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void update(final float progress) {
|
||||
final double processedProgress =
|
||||
MRJobConfUtil.convertTaskProgressToFactor(progress);
|
||||
final double diffProgress = processedProgress - prevProgress;
|
||||
final long currentTime = Time.monotonicNow();
|
||||
boolean result =
|
||||
(Double.compare(diffProgress,
|
||||
MRJobConfUtil.getTaskProgressMinDeltaThreshold()) >= 0);
|
||||
if (!result) {
|
||||
// check if time has expired.
|
||||
result = ((currentTime - logTimeStamp)
|
||||
>= MRJobConfUtil.getTaskProgressWaitDeltaTimeThreshold());
|
||||
}
|
||||
// It is helpful to log the progress when it reaches 1.0F.
|
||||
if (Float.compare(progress, 1.0f) == 0) {
|
||||
result = true;
|
||||
taskAttemptLogProgressStamps.remove(taskAttemptID);
|
||||
}
|
||||
resetLog(result, progress, processedProgress, currentTime);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -51,7 +51,7 @@ public class TestTaskAttemptFinishingMonitor {
|
|||
Configuration conf = new Configuration();
|
||||
conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT, 100);
|
||||
conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS, 10);
|
||||
|
||||
conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD, 0.01);
|
||||
AppContext appCtx = mock(AppContext.class);
|
||||
JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
|
||||
RMHeartbeatHandler rmHeartbeatHandler =
|
||||
|
|
|
@ -501,6 +501,8 @@ public class TestTaskAttemptListenerImpl {
|
|||
|
||||
Configuration conf = new Configuration();
|
||||
conf.setLong(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1);
|
||||
conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD, 0.01);
|
||||
conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS, 1);
|
||||
tal.init(conf);
|
||||
tal.start();
|
||||
|
||||
|
|
|
@ -271,6 +271,7 @@ public class TestFail {
|
|||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
conf.setInt(MRJobConfig.TASK_TIMEOUT, 1*1000);//reduce timeout
|
||||
conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1*1000);
|
||||
conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD, 0.01);
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
};
|
||||
|
|
|
@ -63,6 +63,7 @@ public class TestTaskHeartbeatHandler {
|
|||
// so that TASK_TIMEOUT is not overridden
|
||||
conf.setLong(MRJobConfig.TASK_PROGRESS_REPORT_INTERVAL, 5);
|
||||
conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 10); //10 ms
|
||||
conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD, 0.01);
|
||||
|
||||
hb.init(conf);
|
||||
hb.start();
|
||||
|
@ -205,6 +206,7 @@ public class TestTaskHeartbeatHandler {
|
|||
new TaskHeartbeatHandler(mockHandler, clock, 1);
|
||||
Configuration conf = new Configuration();
|
||||
conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1);
|
||||
conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD, 0.01);
|
||||
hb.init(conf);
|
||||
hb.start();
|
||||
try {
|
||||
|
|
|
@ -372,6 +372,29 @@ public interface MRJobConfig {
|
|||
|
||||
public static final int TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS_DEFAULT = 20 * 1000;
|
||||
|
||||
/**
|
||||
* TaskAttemptListenerImpl will log the task progress when the delta progress
|
||||
* is larger than or equal the defined value.
|
||||
* The double value has to be between 0, and 1 with two decimals.
|
||||
*/
|
||||
String TASK_LOG_PROGRESS_DELTA_THRESHOLD =
|
||||
"mapreduce.task.log.progress.delta.threshold";
|
||||
/**
|
||||
* Default delta progress is set to 5%.
|
||||
*/
|
||||
double TASK_LOG_PROGRESS_DELTA_THRESHOLD_DEFAULT = 0.05;
|
||||
/**
|
||||
* TaskAttemptListenerImpl will log the task progress when the defined value
|
||||
* in seconds expires.
|
||||
* This helps to debug task attempts that are doing very slow progress.
|
||||
*/
|
||||
String TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS =
|
||||
"mapreduce.task.log.progress.wait.interval-seconds";
|
||||
/**
|
||||
* Default period to log the task attempt progress is 60 seconds.
|
||||
*/
|
||||
long TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS_DEFAULT = 60L;
|
||||
|
||||
public static final String TASK_ID = "mapreduce.task.id";
|
||||
|
||||
public static final String TASK_OUTPUT_DIR = "mapreduce.task.output.dir";
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.mapreduce.util;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
|
||||
|
@ -58,4 +59,75 @@ public final class MRJobConfUtil {
|
|||
}
|
||||
|
||||
public static final float TASK_REPORT_INTERVAL_TO_TIMEOUT_RATIO = 0.01f;
|
||||
|
||||
/**
|
||||
* Configurations to control the frequency of logging of task Attempt.
|
||||
*/
|
||||
public static final double PROGRESS_MIN_DELTA_FACTOR = 100.0;
|
||||
private static volatile Double progressMinDeltaThreshold = null;
|
||||
private static volatile Long progressMaxWaitDeltaTimeThreshold = null;
|
||||
|
||||
/**
|
||||
* load the values defined from a configuration file including the delta
|
||||
* progress and the maximum time between each log message.
|
||||
* @param conf
|
||||
*/
|
||||
public static void setTaskLogProgressDeltaThresholds(
|
||||
final Configuration conf) {
|
||||
if (progressMinDeltaThreshold == null) {
|
||||
progressMinDeltaThreshold =
|
||||
new Double(PROGRESS_MIN_DELTA_FACTOR
|
||||
* conf.getDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD,
|
||||
MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD_DEFAULT));
|
||||
}
|
||||
|
||||
if (progressMaxWaitDeltaTimeThreshold == null) {
|
||||
progressMaxWaitDeltaTimeThreshold =
|
||||
TimeUnit.SECONDS.toMillis(conf
|
||||
.getLong(
|
||||
MRJobConfig.TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS,
|
||||
MRJobConfig.TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS_DEFAULT));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the min delta progress required to log the task attempt current
|
||||
* progress.
|
||||
* @return the defined threshold in the conf.
|
||||
* returns the default value if
|
||||
* {@link #setTaskLogProgressDeltaThresholds} has not been called.
|
||||
*/
|
||||
public static double getTaskProgressMinDeltaThreshold() {
|
||||
if (progressMinDeltaThreshold == null) {
|
||||
return PROGRESS_MIN_DELTA_FACTOR
|
||||
* MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD_DEFAULT;
|
||||
}
|
||||
return progressMinDeltaThreshold.doubleValue();
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the min time required to log the task attempt current
|
||||
* progress.
|
||||
* @return the defined threshold in the conf.
|
||||
* returns the default value if
|
||||
* {@link #setTaskLogProgressDeltaThresholds} has not been called.
|
||||
*/
|
||||
public static long getTaskProgressWaitDeltaTimeThreshold() {
|
||||
if (progressMaxWaitDeltaTimeThreshold == null) {
|
||||
return TimeUnit.SECONDS.toMillis(
|
||||
MRJobConfig.TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS_DEFAULT);
|
||||
}
|
||||
return progressMaxWaitDeltaTimeThreshold.longValue();
|
||||
}
|
||||
|
||||
/**
|
||||
* Coverts a progress between 0.0 to 1.0 to double format used to log the
|
||||
* task attempt.
|
||||
* @param progress of the task which is a value between 0.0 and 1.0.
|
||||
* @return the double value that is less than or equal to the argument
|
||||
* multiplied by {@link #PROGRESS_MIN_DELTA_FACTOR}.
|
||||
*/
|
||||
public static double convertTaskProgressToFactor(final float progress) {
|
||||
return Math.floor(progress * MRJobConfUtil.PROGRESS_MIN_DELTA_FACTOR);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue