MAPREDUCE-7053: Timed out tasks can fail to produce thread dump. Contributed by Jason Lowe.
(cherry picked from commit 904a6bf263
)
This commit is contained in:
parent
01fc51fca7
commit
ebf183fa56
|
@ -342,10 +342,16 @@ public class TaskAttemptListenerImpl extends CompositeService
|
|||
AtomicReference<TaskAttemptStatus> lastStatusRef =
|
||||
attemptIdToStatus.get(yarnAttemptID);
|
||||
if (lastStatusRef == null) {
|
||||
// The task is not known, but it could be in the process of tearing
|
||||
// down gracefully or receiving a thread dump signal. Tolerate unknown
|
||||
// tasks as long as they have unregistered recently.
|
||||
if (!taskHeartbeatHandler.hasRecentlyUnregistered(yarnAttemptID)) {
|
||||
LOG.error("Status update was called with illegal TaskAttemptId: "
|
||||
+ yarnAttemptID);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
taskHeartbeatHandler.progressing(yarnAttemptID);
|
||||
TaskAttemptStatus taskAttemptStatus =
|
||||
|
|
|
@ -70,12 +70,14 @@ public class TaskHeartbeatHandler extends AbstractService {
|
|||
private Thread lostTaskCheckerThread;
|
||||
private volatile boolean stopped;
|
||||
private long taskTimeOut;
|
||||
private long unregisterTimeOut;
|
||||
private int taskTimeOutCheckInterval = 30 * 1000; // 30 seconds.
|
||||
|
||||
private final EventHandler eventHandler;
|
||||
private final Clock clock;
|
||||
|
||||
private ConcurrentMap<TaskAttemptId, ReportTime> runningAttempts;
|
||||
private ConcurrentMap<TaskAttemptId, ReportTime> recentlyUnregisteredAttempts;
|
||||
|
||||
public TaskHeartbeatHandler(EventHandler eventHandler, Clock clock,
|
||||
int numThreads) {
|
||||
|
@ -84,6 +86,8 @@ public class TaskHeartbeatHandler extends AbstractService {
|
|||
this.clock = clock;
|
||||
runningAttempts =
|
||||
new ConcurrentHashMap<TaskAttemptId, ReportTime>(16, 0.75f, numThreads);
|
||||
recentlyUnregisteredAttempts =
|
||||
new ConcurrentHashMap<TaskAttemptId, ReportTime>(16, 0.75f, numThreads);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -91,6 +95,8 @@ public class TaskHeartbeatHandler extends AbstractService {
|
|||
super.serviceInit(conf);
|
||||
taskTimeOut = conf.getLong(
|
||||
MRJobConfig.TASK_TIMEOUT, MRJobConfig.DEFAULT_TASK_TIMEOUT_MILLIS);
|
||||
unregisterTimeOut = conf.getLong(MRJobConfig.TASK_EXIT_TIMEOUT,
|
||||
MRJobConfig.TASK_EXIT_TIMEOUT_DEFAULT);
|
||||
|
||||
// enforce task timeout is at least twice as long as task report interval
|
||||
long taskProgressReportIntervalMillis = MRJobConfUtil.
|
||||
|
@ -139,6 +145,12 @@ public class TaskHeartbeatHandler extends AbstractService {
|
|||
|
||||
public void unregister(TaskAttemptId attemptID) {
|
||||
runningAttempts.remove(attemptID);
|
||||
recentlyUnregisteredAttempts.put(attemptID,
|
||||
new ReportTime(clock.getTime()));
|
||||
}
|
||||
|
||||
public boolean hasRecentlyUnregistered(TaskAttemptId attemptID) {
|
||||
return recentlyUnregisteredAttempts.containsKey(attemptID);
|
||||
}
|
||||
|
||||
private class PingChecker implements Runnable {
|
||||
|
@ -146,12 +158,22 @@ public class TaskHeartbeatHandler extends AbstractService {
|
|||
@Override
|
||||
public void run() {
|
||||
while (!stopped && !Thread.currentThread().isInterrupted()) {
|
||||
long currentTime = clock.getTime();
|
||||
checkRunning(currentTime);
|
||||
checkRecentlyUnregistered(currentTime);
|
||||
try {
|
||||
Thread.sleep(taskTimeOutCheckInterval);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("TaskHeartbeatHandler thread interrupted");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void checkRunning(long currentTime) {
|
||||
Iterator<Map.Entry<TaskAttemptId, ReportTime>> iterator =
|
||||
runningAttempts.entrySet().iterator();
|
||||
|
||||
// avoid calculating current time everytime in loop
|
||||
long currentTime = clock.getTime();
|
||||
|
||||
while (iterator.hasNext()) {
|
||||
Map.Entry<TaskAttemptId, ReportTime> entry = iterator.next();
|
||||
boolean taskTimedOut = (taskTimeOut > 0) &&
|
||||
|
@ -167,11 +189,16 @@ public class TaskHeartbeatHandler extends AbstractService {
|
|||
TaskAttemptEventType.TA_TIMED_OUT));
|
||||
}
|
||||
}
|
||||
try {
|
||||
Thread.sleep(taskTimeOutCheckInterval);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("TaskHeartbeatHandler thread interrupted");
|
||||
break;
|
||||
}
|
||||
|
||||
private void checkRecentlyUnregistered(long currentTime) {
|
||||
Iterator<ReportTime> iterator =
|
||||
recentlyUnregisteredAttempts.values().iterator();
|
||||
while (iterator.hasNext()) {
|
||||
ReportTime unregisteredTime = iterator.next();
|
||||
if (currentTime >
|
||||
unregisteredTime.getLastProgress() + unregisterTimeOut) {
|
||||
iterator.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,12 +17,14 @@
|
|||
*/
|
||||
package org.apache.hadoop.mapred;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.times;
|
||||
|
@ -35,6 +37,7 @@ import java.util.concurrent.ConcurrentMap;
|
|||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.TaskType;
|
||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
||||
|
@ -51,11 +54,13 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent
|
|||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
|
||||
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.Event;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.util.ControlledClock;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
|
@ -398,14 +403,52 @@ public class TestTaskAttemptListenerImpl {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testStatusUpdateFromUnregisteredTask()
|
||||
throws IOException, InterruptedException{
|
||||
public void testStatusUpdateFromUnregisteredTask() throws Exception {
|
||||
configureMocks();
|
||||
startListener(false);
|
||||
ControlledClock clock = new ControlledClock();
|
||||
clock.setTime(0);
|
||||
doReturn(clock).when(appCtx).getClock();
|
||||
|
||||
boolean taskFound = listener.statusUpdate(attemptID, firstReduceStatus);
|
||||
final TaskAttemptListenerImpl tal = new TaskAttemptListenerImpl(appCtx,
|
||||
secret, rmHeartbeatHandler, null) {
|
||||
@Override
|
||||
protected void startRpcServer() {
|
||||
// Empty
|
||||
}
|
||||
@Override
|
||||
protected void stopRpcServer() {
|
||||
// Empty
|
||||
}
|
||||
};
|
||||
|
||||
assertFalse(taskFound);
|
||||
Configuration conf = new Configuration();
|
||||
conf.setLong(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1);
|
||||
tal.init(conf);
|
||||
tal.start();
|
||||
|
||||
assertFalse(tal.statusUpdate(attemptID, firstReduceStatus));
|
||||
tal.registerPendingTask(task, wid);
|
||||
tal.registerLaunchedTask(attemptId, wid);
|
||||
assertTrue(tal.statusUpdate(attemptID, firstReduceStatus));
|
||||
|
||||
// verify attempt is still reported as found if recently unregistered
|
||||
tal.unregister(attemptId, wid);
|
||||
assertTrue(tal.statusUpdate(attemptID, firstReduceStatus));
|
||||
|
||||
// verify attempt is not found if not recently unregistered
|
||||
long unregisterTimeout = conf.getLong(MRJobConfig.TASK_EXIT_TIMEOUT,
|
||||
MRJobConfig.TASK_EXIT_TIMEOUT_DEFAULT);
|
||||
clock.setTime(unregisterTimeout + 1);
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
try {
|
||||
return !tal.statusUpdate(attemptID, firstReduceStatus);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("status update failed", e);
|
||||
}
|
||||
}
|
||||
}, 10, 10000);
|
||||
}
|
||||
|
||||
private void configureMocks() {
|
||||
|
|
|
@ -23,6 +23,7 @@ import static org.mockito.Mockito.mock;
|
|||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
|
@ -30,10 +31,12 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
|||
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
||||
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.event.Event;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.ControlledClock;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
@ -105,6 +108,41 @@ public class TestTaskHeartbeatHandler {
|
|||
verifyTaskTimeoutConfig(conf, expectedTimeout);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTaskUnregistered() throws Exception {
|
||||
EventHandler mockHandler = mock(EventHandler.class);
|
||||
ControlledClock clock = new ControlledClock();
|
||||
clock.setTime(0);
|
||||
final TaskHeartbeatHandler hb =
|
||||
new TaskHeartbeatHandler(mockHandler, clock, 1);
|
||||
Configuration conf = new Configuration();
|
||||
conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1);
|
||||
hb.init(conf);
|
||||
hb.start();
|
||||
try {
|
||||
ApplicationId appId = ApplicationId.newInstance(0l, 5);
|
||||
JobId jobId = MRBuilderUtils.newJobId(appId, 4);
|
||||
TaskId tid = MRBuilderUtils.newTaskId(jobId, 3, TaskType.MAP);
|
||||
final TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 2);
|
||||
Assert.assertFalse(hb.hasRecentlyUnregistered(taid));
|
||||
hb.register(taid);
|
||||
Assert.assertFalse(hb.hasRecentlyUnregistered(taid));
|
||||
hb.unregister(taid);
|
||||
Assert.assertTrue(hb.hasRecentlyUnregistered(taid));
|
||||
long unregisterTimeout = conf.getLong(MRJobConfig.TASK_EXIT_TIMEOUT,
|
||||
MRJobConfig.TASK_EXIT_TIMEOUT_DEFAULT);
|
||||
clock.setTime(unregisterTimeout + 1);
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
return !hb.hasRecentlyUnregistered(taid);
|
||||
}
|
||||
}, 10, 10000);
|
||||
} finally {
|
||||
hb.stop();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test if task timeout is set properly in response to the configuration of
|
||||
* the task progress report interval.
|
||||
|
|
Loading…
Reference in New Issue