From b432cf076ef5a0ca962d536bd04d96654c21bdb4 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Wed, 28 Nov 2012 19:24:27 +0000 Subject: [PATCH] merge -r 1414872:1414873 from trunk. FIXES: MAPREDUCE-4817 git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1414876 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 +++ .../mapred/TaskAttemptListenerImpl.java | 1 - .../v2/app/TaskHeartbeatHandler.java | 27 +++---------------- 3 files changed, 6 insertions(+), 25 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 5eba17c1709..a30050ec451 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -452,6 +452,9 @@ Release 0.23.6 - UNRELEASED MAPREDUCE-4825. JobImpl.finished doesn't expect ERROR as a final job state (jlowe via bobby) + MAPREDUCE-4817. Hardcoded task ping timeout kills tasks localizing large + amounts of data (tgraves) + Release 0.23.5 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index ab084cfcbd7..e9f890a7f00 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -274,7 +274,6 @@ public class TaskAttemptListenerImpl extends CompositeService @Override public boolean ping(TaskAttemptID taskAttemptID) throws IOException { LOG.info("Ping from " + taskAttemptID.toString()); - taskHeartbeatHandler.pinged(TypeConverter.toYarn(taskAttemptID)); return true; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java index b83f663836d..153b5a8d6d9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java @@ -46,33 +46,22 @@ import org.apache.hadoop.yarn.service.AbstractService; public class TaskHeartbeatHandler extends AbstractService { private static class ReportTime { - private long lastPing; private long lastProgress; public ReportTime(long time) { setLastProgress(time); } - public synchronized void setLastPing(long time) { - lastPing = time; - } - public synchronized void setLastProgress(long time) { lastProgress = time; - lastPing = time; } - - public synchronized long getLastPing() { - return lastPing; - } - + public synchronized long getLastProgress() { return lastProgress; } } private static final Log LOG = LogFactory.getLog(TaskHeartbeatHandler.class); - private static final int PING_TIMEOUT = 5 * 60 * 1000; //thread which runs periodically to see the last time since a heartbeat is //received from a task. @@ -127,14 +116,6 @@ public class TaskHeartbeatHandler extends AbstractService { } } - public void pinged(TaskAttemptId attemptID) { - //only put for the registered attempts - //TODO throw an exception if the task isn't registered. - ReportTime time = runningAttempts.get(attemptID); - if(time != null) { - time.setLastPing(clock.getTime()); - } - } public void register(TaskAttemptId attemptID) { runningAttempts.put(attemptID, new ReportTime(clock.getTime())); @@ -159,10 +140,8 @@ public class TaskHeartbeatHandler extends AbstractService { Map.Entry entry = iterator.next(); boolean taskTimedOut = (taskTimeOut > 0) && (currentTime > (entry.getValue().getLastProgress() + taskTimeOut)); - boolean pingTimedOut = - (currentTime > (entry.getValue().getLastPing() + PING_TIMEOUT)); - - if(taskTimedOut || pingTimedOut) { + + if(taskTimedOut) { // task is lost, remove from the list and raise lost event iterator.remove(); eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry