merge -r 1308530:1308531 from trunk. FIXES: MAPREDUCE-4089

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1308533 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas Graves 2012-04-02 20:24:46 +00:00
parent 54a0b26231
commit ad9f93f7b5
5 changed files with 142 additions and 35 deletions

View File

@ -114,6 +114,8 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4092. commitJob Exception does not fail job (Jon Eagles via MAPREDUCE-4092. commitJob Exception does not fail job (Jon Eagles via
bobby) bobby)
MAPREDUCE-4089. Hung Tasks never time out. (Robert Evans via tgraves)
Release 0.23.2 - UNRELEASED Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -175,7 +175,7 @@ public class TaskAttemptListenerImpl extends CompositeService
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
TypeConverter.toYarn(taskAttemptID); TypeConverter.toYarn(taskAttemptID);
taskHeartbeatHandler.receivedPing(attemptID); taskHeartbeatHandler.progressing(attemptID);
Job job = context.getJob(attemptID.getTaskId().getJobId()); Job job = context.getJob(attemptID.getTaskId().getJobId());
Task task = job.getTask(attemptID.getTaskId()); Task task = job.getTask(attemptID.getTaskId());
@ -203,7 +203,7 @@ public class TaskAttemptListenerImpl extends CompositeService
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
TypeConverter.toYarn(taskAttemptID); TypeConverter.toYarn(taskAttemptID);
taskHeartbeatHandler.receivedPing(attemptID); taskHeartbeatHandler.progressing(attemptID);
//Ignorable TaskStatus? - since a task will send a LastStatusUpdate //Ignorable TaskStatus? - since a task will send a LastStatusUpdate
context.getEventHandler().handle( context.getEventHandler().handle(
new TaskAttemptEvent(attemptID, new TaskAttemptEvent(attemptID,
@ -217,7 +217,7 @@ public class TaskAttemptListenerImpl extends CompositeService
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
TypeConverter.toYarn(taskAttemptID); TypeConverter.toYarn(taskAttemptID);
taskHeartbeatHandler.receivedPing(attemptID); taskHeartbeatHandler.progressing(attemptID);
context.getEventHandler().handle( context.getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE)); new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE));
@ -270,7 +270,7 @@ public class TaskAttemptListenerImpl extends CompositeService
context.getJob(attemptID.getTaskId().getJobId()).getTaskAttemptCompletionEvents( context.getJob(attemptID.getTaskId().getJobId()).getTaskAttemptCompletionEvents(
fromEventId, maxEvents); fromEventId, maxEvents);
taskHeartbeatHandler.receivedPing(attemptID); taskHeartbeatHandler.progressing(attemptID);
// filter the events to return only map completion events in old format // filter the events to return only map completion events in old format
List<TaskCompletionEvent> mapEvents = new ArrayList<TaskCompletionEvent>(); List<TaskCompletionEvent> mapEvents = new ArrayList<TaskCompletionEvent>();
@ -287,7 +287,7 @@ public class TaskAttemptListenerImpl extends CompositeService
@Override @Override
public boolean ping(TaskAttemptID taskAttemptID) throws IOException { public boolean ping(TaskAttemptID taskAttemptID) throws IOException {
LOG.info("Ping from " + taskAttemptID.toString()); LOG.info("Ping from " + taskAttemptID.toString());
taskHeartbeatHandler.receivedPing(TypeConverter.toYarn(taskAttemptID)); taskHeartbeatHandler.pinged(TypeConverter.toYarn(taskAttemptID));
return true; return true;
} }
@ -299,7 +299,7 @@ public class TaskAttemptListenerImpl extends CompositeService
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
TypeConverter.toYarn(taskAttemptID); TypeConverter.toYarn(taskAttemptID);
taskHeartbeatHandler.receivedPing(attemptID); taskHeartbeatHandler.progressing(attemptID);
// This is mainly used for cases where we want to propagate exception traces // This is mainly used for cases where we want to propagate exception traces
// of tasks that fail. // of tasks that fail.
@ -317,7 +317,7 @@ public class TaskAttemptListenerImpl extends CompositeService
LOG.info("Status update from " + taskAttemptID.toString()); LOG.info("Status update from " + taskAttemptID.toString());
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID = org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID =
TypeConverter.toYarn(taskAttemptID); TypeConverter.toYarn(taskAttemptID);
taskHeartbeatHandler.receivedPing(yarnAttemptID); taskHeartbeatHandler.progressing(yarnAttemptID);
TaskAttemptStatus taskAttemptStatus = TaskAttemptStatus taskAttemptStatus =
new TaskAttemptStatus(); new TaskAttemptStatus();
taskAttemptStatus.id = yarnAttemptID; taskAttemptStatus.id = yarnAttemptID;

View File

@ -45,7 +45,34 @@ import org.apache.hadoop.yarn.service.AbstractService;
@SuppressWarnings({"unchecked", "rawtypes"}) @SuppressWarnings({"unchecked", "rawtypes"})
public class TaskHeartbeatHandler extends AbstractService { public class TaskHeartbeatHandler extends AbstractService {
private static class ReportTime {
private long lastPing;
private long lastProgress;
public ReportTime(long time) {
setLastProgress(time);
}
public synchronized void setLastPing(long time) {
lastPing = time;
}
public synchronized void setLastProgress(long time) {
lastProgress = time;
lastPing = time;
}
public synchronized long getLastPing() {
return lastPing;
}
public synchronized long getLastProgress() {
return lastProgress;
}
}
private static final Log LOG = LogFactory.getLog(TaskHeartbeatHandler.class); private static final Log LOG = LogFactory.getLog(TaskHeartbeatHandler.class);
private static final int PING_TIMEOUT = 5 * 60 * 1000;
//thread which runs periodically to see the last time since a heartbeat is //thread which runs periodically to see the last time since a heartbeat is
//received from a task. //received from a task.
@ -57,7 +84,7 @@ public class TaskHeartbeatHandler extends AbstractService {
private final EventHandler eventHandler; private final EventHandler eventHandler;
private final Clock clock; private final Clock clock;
private ConcurrentMap<TaskAttemptId, Long> runningAttempts; private ConcurrentMap<TaskAttemptId, ReportTime> runningAttempts;
public TaskHeartbeatHandler(EventHandler eventHandler, Clock clock, public TaskHeartbeatHandler(EventHandler eventHandler, Clock clock,
int numThreads) { int numThreads) {
@ -65,7 +92,7 @@ public class TaskHeartbeatHandler extends AbstractService {
this.eventHandler = eventHandler; this.eventHandler = eventHandler;
this.clock = clock; this.clock = clock;
runningAttempts = runningAttempts =
new ConcurrentHashMap<TaskAttemptId, Long>(16, 0.75f, numThreads); new ConcurrentHashMap<TaskAttemptId, ReportTime>(16, 0.75f, numThreads);
} }
@Override @Override
@ -91,14 +118,26 @@ public class TaskHeartbeatHandler extends AbstractService {
super.stop(); super.stop();
} }
public void receivedPing(TaskAttemptId attemptID) { public void progressing(TaskAttemptId attemptID) {
//only put for the registered attempts //only put for the registered attempts
//TODO throw an exception if the task isn't registered. //TODO throw an exception if the task isn't registered.
runningAttempts.replace(attemptID, clock.getTime()); ReportTime time = runningAttempts.get(attemptID);
if(time != null) {
time.setLastProgress(clock.getTime());
}
}
public void pinged(TaskAttemptId attemptID) {
//only put for the registered attempts
//TODO throw an exception if the task isn't registered.
ReportTime time = runningAttempts.get(attemptID);
if(time != null) {
time.setLastPing(clock.getTime());
}
} }
public void register(TaskAttemptId attemptID) { public void register(TaskAttemptId attemptID) {
runningAttempts.put(attemptID, clock.getTime()); runningAttempts.put(attemptID, new ReportTime(clock.getTime()));
} }
public void unregister(TaskAttemptId attemptID) { public void unregister(TaskAttemptId attemptID) {
@ -110,21 +149,20 @@ public class TaskHeartbeatHandler extends AbstractService {
@Override @Override
public void run() { public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) { while (!stopped && !Thread.currentThread().isInterrupted()) {
Iterator<Map.Entry<TaskAttemptId, Long>> iterator = Iterator<Map.Entry<TaskAttemptId, ReportTime>> iterator =
runningAttempts.entrySet().iterator(); runningAttempts.entrySet().iterator();
// avoid calculating current time everytime in loop // avoid calculating current time everytime in loop
long currentTime = clock.getTime(); long currentTime = clock.getTime();
while (iterator.hasNext()) { while (iterator.hasNext()) {
Map.Entry<TaskAttemptId, Long> entry = iterator.next(); Map.Entry<TaskAttemptId, ReportTime> entry = iterator.next();
if (currentTime > entry.getValue() + taskTimeOut) { boolean taskTimedOut = (taskTimeOut > 0) &&
(currentTime > (entry.getValue().getLastProgress() + taskTimeOut));
boolean pingTimedOut =
(currentTime > (entry.getValue().getLastPing() + PING_TIMEOUT));
//In case the iterator isn't picking up the latest. if(taskTimedOut || pingTimedOut) {
// Extra lookup outside of the iterator - but only if the task
// is considered to be timed out.
Long taskTime = runningAttempts.get(entry.getKey());
if (taskTime != null && currentTime > taskTime + taskTimeOut) {
// task is lost, remove from the list and raise lost event // task is lost, remove from the list and raise lost event
iterator.remove(); iterator.remove();
eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry
@ -133,8 +171,6 @@ public class TaskHeartbeatHandler extends AbstractService {
eventHandler.handle(new TaskAttemptEvent(entry.getKey(), eventHandler.handle(new TaskAttemptEvent(entry.getKey(),
TaskAttemptEventType.TA_TIMED_OUT)); TaskAttemptEventType.TA_TIMED_OUT));
} }
}
} }
try { try {
Thread.sleep(taskTimeOutCheckInterval); Thread.sleep(taskTimeOutCheckInterval);

View File

@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
import static org.mockito.Mockito.*;
public class TestTaskHeartbeatHandler {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testTimeout() throws InterruptedException {
EventHandler mockHandler = mock(EventHandler.class);
Clock clock = new SystemClock();
TaskHeartbeatHandler hb = new TaskHeartbeatHandler(mockHandler, clock, 1);
Configuration conf = new Configuration();
conf.setInt(MRJobConfig.TASK_TIMEOUT, 10); //10 ms
conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 10); //10 ms
hb.init(conf);
hb.start();
try {
ApplicationId appId = BuilderUtils.newApplicationId(0l, 5);
JobId jobId = MRBuilderUtils.newJobId(appId, 4);
TaskId tid = MRBuilderUtils.newTaskId(jobId, 3, TaskType.MAP);
TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 2);
hb.register(taid);
Thread.sleep(100);
//Events only happen when the task is canceled
verify(mockHandler, times(2)).handle(any(Event.class));
} finally {
hb.stop();
}
}
}

View File

@ -351,7 +351,7 @@
<value>600000</value> <value>600000</value>
<description>The number of milliseconds before a task will be <description>The number of milliseconds before a task will be
terminated if it neither reads an input, writes an output, nor terminated if it neither reads an input, writes an output, nor
updates its status string. updates its status string. A value of 0 disables the timeout.
</description> </description>
</property> </property>