merge -r 1414872:1414873 from trunk. FIXES: MAPREDUCE-4817

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1414876 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas Graves 2012-11-28 19:24:27 +00:00
parent 3e0d0cf08f
commit b432cf076e
3 changed files with 6 additions and 25 deletions

View File

@ -452,6 +452,9 @@ Release 0.23.6 - UNRELEASED
MAPREDUCE-4825. JobImpl.finished doesn't expect ERROR as a final job state MAPREDUCE-4825. JobImpl.finished doesn't expect ERROR as a final job state
(jlowe via bobby) (jlowe via bobby)
MAPREDUCE-4817. Hardcoded task ping timeout kills tasks localizing large
amounts of data (tgraves)
Release 0.23.5 - UNRELEASED Release 0.23.5 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -274,7 +274,6 @@ public class TaskAttemptListenerImpl extends CompositeService
@Override @Override
public boolean ping(TaskAttemptID taskAttemptID) throws IOException { public boolean ping(TaskAttemptID taskAttemptID) throws IOException {
LOG.info("Ping from " + taskAttemptID.toString()); LOG.info("Ping from " + taskAttemptID.toString());
taskHeartbeatHandler.pinged(TypeConverter.toYarn(taskAttemptID));
return true; return true;
} }

View File

@ -46,24 +46,14 @@ import org.apache.hadoop.yarn.service.AbstractService;
public class TaskHeartbeatHandler extends AbstractService { public class TaskHeartbeatHandler extends AbstractService {
private static class ReportTime { private static class ReportTime {
private long lastPing;
private long lastProgress; private long lastProgress;
public ReportTime(long time) { public ReportTime(long time) {
setLastProgress(time); setLastProgress(time);
} }
public synchronized void setLastPing(long time) {
lastPing = time;
}
public synchronized void setLastProgress(long time) { public synchronized void setLastProgress(long time) {
lastProgress = time; lastProgress = time;
lastPing = time;
}
public synchronized long getLastPing() {
return lastPing;
} }
public synchronized long getLastProgress() { public synchronized long getLastProgress() {
@ -72,7 +62,6 @@ public class TaskHeartbeatHandler extends AbstractService {
} }
private static final Log LOG = LogFactory.getLog(TaskHeartbeatHandler.class); private static final Log LOG = LogFactory.getLog(TaskHeartbeatHandler.class);
private static final int PING_TIMEOUT = 5 * 60 * 1000;
//thread which runs periodically to see the last time since a heartbeat is //thread which runs periodically to see the last time since a heartbeat is
//received from a task. //received from a task.
@ -127,14 +116,6 @@ public class TaskHeartbeatHandler extends AbstractService {
} }
} }
public void pinged(TaskAttemptId attemptID) {
//only put for the registered attempts
//TODO throw an exception if the task isn't registered.
ReportTime time = runningAttempts.get(attemptID);
if(time != null) {
time.setLastPing(clock.getTime());
}
}
public void register(TaskAttemptId attemptID) { public void register(TaskAttemptId attemptID) {
runningAttempts.put(attemptID, new ReportTime(clock.getTime())); runningAttempts.put(attemptID, new ReportTime(clock.getTime()));
@ -159,10 +140,8 @@ public class TaskHeartbeatHandler extends AbstractService {
Map.Entry<TaskAttemptId, ReportTime> entry = iterator.next(); Map.Entry<TaskAttemptId, ReportTime> entry = iterator.next();
boolean taskTimedOut = (taskTimeOut > 0) && boolean taskTimedOut = (taskTimeOut > 0) &&
(currentTime > (entry.getValue().getLastProgress() + taskTimeOut)); (currentTime > (entry.getValue().getLastProgress() + taskTimeOut));
boolean pingTimedOut =
(currentTime > (entry.getValue().getLastPing() + PING_TIMEOUT));
if(taskTimedOut || pingTimedOut) { if(taskTimedOut) {
// task is lost, remove from the list and raise lost event // task is lost, remove from the list and raise lost event
iterator.remove(); iterator.remove();
eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry