MAPREDUCE-7053: Timed out tasks can fail to produce thread dump. Contributed by Jason Lowe.

This commit is contained in:
Eric Payne 2018-02-16 08:15:09 -06:00
parent a1e05e0292
commit 82f029f7b5
4 changed files with 145 additions and 29 deletions

View File

@ -370,17 +370,22 @@ public AMFeedback statusUpdate(TaskAttemptID taskAttemptID,
TypeConverter.toYarn(taskAttemptID); TypeConverter.toYarn(taskAttemptID);
AMFeedback feedback = new AMFeedback(); AMFeedback feedback = new AMFeedback();
feedback.setTaskFound(true);
AtomicReference<TaskAttemptStatus> lastStatusRef = AtomicReference<TaskAttemptStatus> lastStatusRef =
attemptIdToStatus.get(yarnAttemptID); attemptIdToStatus.get(yarnAttemptID);
if (lastStatusRef == null) { if (lastStatusRef == null) {
LOG.error("Status update was called with illegal TaskAttemptId: " // The task is not known, but it could be in the process of tearing
+ yarnAttemptID); // down gracefully or receiving a thread dump signal. Tolerate unknown
feedback.setTaskFound(false); // tasks as long as they have unregistered recently.
if (!taskHeartbeatHandler.hasRecentlyUnregistered(yarnAttemptID)) {
LOG.error("Status update was called with illegal TaskAttemptId: "
+ yarnAttemptID);
feedback.setTaskFound(false);
}
return feedback; return feedback;
} }
feedback.setTaskFound(true);
// Propagating preemption to the task if TASK_PREEMPTION is enabled // Propagating preemption to the task if TASK_PREEMPTION is enabled
if (getConfig().getBoolean(MRJobConfig.TASK_PREEMPTION, false) if (getConfig().getBoolean(MRJobConfig.TASK_PREEMPTION, false)
&& preemptionPolicy.isPreempted(yarnAttemptID)) { && preemptionPolicy.isPreempted(yarnAttemptID)) {

View File

@ -71,12 +71,14 @@ public synchronized long getLastProgress() {
private Thread lostTaskCheckerThread; private Thread lostTaskCheckerThread;
private volatile boolean stopped; private volatile boolean stopped;
private long taskTimeOut; private long taskTimeOut;
private long unregisterTimeOut;
private int taskTimeOutCheckInterval = 30 * 1000; // 30 seconds. private int taskTimeOutCheckInterval = 30 * 1000; // 30 seconds.
private final EventHandler eventHandler; private final EventHandler eventHandler;
private final Clock clock; private final Clock clock;
private ConcurrentMap<TaskAttemptId, ReportTime> runningAttempts; private ConcurrentMap<TaskAttemptId, ReportTime> runningAttempts;
private ConcurrentMap<TaskAttemptId, ReportTime> recentlyUnregisteredAttempts;
public TaskHeartbeatHandler(EventHandler eventHandler, Clock clock, public TaskHeartbeatHandler(EventHandler eventHandler, Clock clock,
int numThreads) { int numThreads) {
@ -85,6 +87,8 @@ public TaskHeartbeatHandler(EventHandler eventHandler, Clock clock,
this.clock = clock; this.clock = clock;
runningAttempts = runningAttempts =
new ConcurrentHashMap<TaskAttemptId, ReportTime>(16, 0.75f, numThreads); new ConcurrentHashMap<TaskAttemptId, ReportTime>(16, 0.75f, numThreads);
recentlyUnregisteredAttempts =
new ConcurrentHashMap<TaskAttemptId, ReportTime>(16, 0.75f, numThreads);
} }
@Override @Override
@ -92,6 +96,8 @@ protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf); super.serviceInit(conf);
taskTimeOut = conf.getLong( taskTimeOut = conf.getLong(
MRJobConfig.TASK_TIMEOUT, MRJobConfig.DEFAULT_TASK_TIMEOUT_MILLIS); MRJobConfig.TASK_TIMEOUT, MRJobConfig.DEFAULT_TASK_TIMEOUT_MILLIS);
unregisterTimeOut = conf.getLong(MRJobConfig.TASK_EXIT_TIMEOUT,
MRJobConfig.TASK_EXIT_TIMEOUT_DEFAULT);
// enforce task timeout is at least twice as long as task report interval // enforce task timeout is at least twice as long as task report interval
long taskProgressReportIntervalMillis = MRJobConfUtil. long taskProgressReportIntervalMillis = MRJobConfUtil.
@ -140,6 +146,12 @@ public void register(TaskAttemptId attemptID) {
public void unregister(TaskAttemptId attemptID) { public void unregister(TaskAttemptId attemptID) {
runningAttempts.remove(attemptID); runningAttempts.remove(attemptID);
recentlyUnregisteredAttempts.put(attemptID,
new ReportTime(clock.getTime()));
}
public boolean hasRecentlyUnregistered(TaskAttemptId attemptID) {
return recentlyUnregisteredAttempts.containsKey(attemptID);
} }
private class PingChecker implements Runnable { private class PingChecker implements Runnable {
@ -147,27 +159,9 @@ private class PingChecker implements Runnable {
@Override @Override
public void run() { public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) { while (!stopped && !Thread.currentThread().isInterrupted()) {
Iterator<Map.Entry<TaskAttemptId, ReportTime>> iterator =
runningAttempts.entrySet().iterator();
// avoid calculating current time everytime in loop
long currentTime = clock.getTime(); long currentTime = clock.getTime();
checkRunning(currentTime);
while (iterator.hasNext()) { checkRecentlyUnregistered(currentTime);
Map.Entry<TaskAttemptId, ReportTime> entry = iterator.next();
boolean taskTimedOut = (taskTimeOut > 0) &&
(currentTime > (entry.getValue().getLastProgress() + taskTimeOut));
if(taskTimedOut) {
// task is lost, remove from the list and raise lost event
iterator.remove();
eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry
.getKey(), "AttemptID:" + entry.getKey().toString()
+ " Timed out after " + taskTimeOut / 1000 + " secs"));
eventHandler.handle(new TaskAttemptEvent(entry.getKey(),
TaskAttemptEventType.TA_TIMED_OUT));
}
}
try { try {
Thread.sleep(taskTimeOutCheckInterval); Thread.sleep(taskTimeOutCheckInterval);
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -176,6 +170,39 @@ public void run() {
} }
} }
} }
private void checkRunning(long currentTime) {
Iterator<Map.Entry<TaskAttemptId, ReportTime>> iterator =
runningAttempts.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<TaskAttemptId, ReportTime> entry = iterator.next();
boolean taskTimedOut = (taskTimeOut > 0) &&
(currentTime > (entry.getValue().getLastProgress() + taskTimeOut));
if(taskTimedOut) {
// task is lost, remove from the list and raise lost event
iterator.remove();
eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry
.getKey(), "AttemptID:" + entry.getKey().toString()
+ " Timed out after " + taskTimeOut / 1000 + " secs"));
eventHandler.handle(new TaskAttemptEvent(entry.getKey(),
TaskAttemptEventType.TA_TIMED_OUT));
}
}
}
private void checkRecentlyUnregistered(long currentTime) {
Iterator<ReportTime> iterator =
recentlyUnregisteredAttempts.values().iterator();
while (iterator.hasNext()) {
ReportTime unregisteredTime = iterator.next();
if (currentTime >
unregisteredTime.getLastProgress() + unregisterTimeOut) {
iterator.remove();
}
}
}
} }
@VisibleForTesting @VisibleForTesting

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.mapred; package org.apache.hadoop.mapred;
import com.google.common.base.Supplier;
import org.apache.hadoop.mapred.Counters.Counter; import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapreduce.checkpoint.EnumCounter; import org.apache.hadoop.mapreduce.checkpoint.EnumCounter;
@ -51,11 +52,13 @@
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.CheckpointAMPreemptionPolicy; import org.apache.hadoop.mapreduce.v2.app.rm.preemption.CheckpointAMPreemptionPolicy;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.After; import org.junit.After;
import org.junit.Test; import org.junit.Test;
@ -488,14 +491,57 @@ public void testCoalescedStatusUpdatesCleared()
} }
@Test @Test
public void testStatusUpdateFromUnregisteredTask() public void testStatusUpdateFromUnregisteredTask() throws Exception {
throws IOException, InterruptedException{
configureMocks(); configureMocks();
startListener(false); ControlledClock clock = new ControlledClock();
clock.setTime(0);
doReturn(clock).when(appCtx).getClock();
AMFeedback feedback = listener.statusUpdate(attemptID, firstReduceStatus); final TaskAttemptListenerImpl tal = new TaskAttemptListenerImpl(appCtx,
secret, rmHeartbeatHandler, policy) {
@Override
protected void startRpcServer() {
// Empty
}
@Override
protected void stopRpcServer() {
// Empty
}
};
Configuration conf = new Configuration();
conf.setLong(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1);
tal.init(conf);
tal.start();
AMFeedback feedback = tal.statusUpdate(attemptID, firstReduceStatus);
assertFalse(feedback.getTaskFound()); assertFalse(feedback.getTaskFound());
tal.registerPendingTask(task, wid);
tal.registerLaunchedTask(attemptId, wid);
feedback = tal.statusUpdate(attemptID, firstReduceStatus);
assertTrue(feedback.getTaskFound());
// verify attempt is still reported as found if recently unregistered
tal.unregister(attemptId, wid);
feedback = tal.statusUpdate(attemptID, firstReduceStatus);
assertTrue(feedback.getTaskFound());
// verify attempt is not found if not recently unregistered
long unregisterTimeout = conf.getLong(MRJobConfig.TASK_EXIT_TIMEOUT,
MRJobConfig.TASK_EXIT_TIMEOUT_DEFAULT);
clock.setTime(unregisterTimeout + 1);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
AMFeedback response =
tal.statusUpdate(attemptID, firstReduceStatus);
return !response.getTaskFound();
} catch (Exception e) {
throw new RuntimeException("status update failed", e);
}
}
}, 10, 10000);
} }
private void configureMocks() { private void configureMocks() {

View File

@ -23,6 +23,7 @@
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import com.google.common.base.Supplier;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@ -30,10 +31,12 @@
import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -105,6 +108,41 @@ public void testTaskTimeoutConfigWithoutTaskProgressReportInterval() {
verifyTaskTimeoutConfig(conf, expectedTimeout); verifyTaskTimeoutConfig(conf, expectedTimeout);
} }
@Test
public void testTaskUnregistered() throws Exception {
EventHandler mockHandler = mock(EventHandler.class);
ControlledClock clock = new ControlledClock();
clock.setTime(0);
final TaskHeartbeatHandler hb =
new TaskHeartbeatHandler(mockHandler, clock, 1);
Configuration conf = new Configuration();
conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1);
hb.init(conf);
hb.start();
try {
ApplicationId appId = ApplicationId.newInstance(0l, 5);
JobId jobId = MRBuilderUtils.newJobId(appId, 4);
TaskId tid = MRBuilderUtils.newTaskId(jobId, 3, TaskType.MAP);
final TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 2);
Assert.assertFalse(hb.hasRecentlyUnregistered(taid));
hb.register(taid);
Assert.assertFalse(hb.hasRecentlyUnregistered(taid));
hb.unregister(taid);
Assert.assertTrue(hb.hasRecentlyUnregistered(taid));
long unregisterTimeout = conf.getLong(MRJobConfig.TASK_EXIT_TIMEOUT,
MRJobConfig.TASK_EXIT_TIMEOUT_DEFAULT);
clock.setTime(unregisterTimeout + 1);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return !hb.hasRecentlyUnregistered(taid);
}
}, 10, 10000);
} finally {
hb.stop();
}
}
/** /**
* Test if task timeout is set properly in response to the configuration of * Test if task timeout is set properly in response to the configuration of
* the task progress report interval. * the task progress report interval.