MAPREDUCE-7198. mapreduce.task.timeout=0 configuration used to disable timeout doesn't work.

This commit is contained in:
Akira Ajisaka 2019-05-23 10:21:11 +09:00
parent 9c61494c02
commit 5565f2c532
No known key found for this signature in database
GPG Key ID: C1EDBB9CA400FD50
3 changed files with 46 additions and 3 deletions

View File

@ -192,7 +192,8 @@ public class TaskHeartbeatHandler extends AbstractService {
(currentTime > (entry.getValue().getLastProgress() + taskTimeOut)); (currentTime > (entry.getValue().getLastProgress() + taskTimeOut));
// when container in NM not started in a long time, // when container in NM not started in a long time,
// we think the taskAttempt is stuck // we think the taskAttempt is stuck
boolean taskStuck = (!entry.getValue().isReported()) && boolean taskStuck = (taskStuckTimeOut > 0) &&
(!entry.getValue().isReported()) &&
(currentTime > (currentTime >
(entry.getValue().getLastProgress() + taskStuckTimeOut)); (entry.getValue().getLastProgress() + taskStuckTimeOut));
@ -225,7 +226,7 @@ public class TaskHeartbeatHandler extends AbstractService {
} }
@VisibleForTesting @VisibleForTesting
ConcurrentMap getRunningAttempts(){ ConcurrentMap<TaskAttemptId, ReportTime> getRunningAttempts(){
return runningAttempts; return runningAttempts;
} }

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.app;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -48,7 +49,7 @@ import java.util.concurrent.ConcurrentMap;
public class TestTaskHeartbeatHandler { public class TestTaskHeartbeatHandler {
@SuppressWarnings({ "rawtypes", "unchecked" }) @SuppressWarnings("unchecked")
@Test @Test
public void testTaskTimeout() throws InterruptedException { public void testTaskTimeout() throws InterruptedException {
EventHandler mockHandler = mock(EventHandler.class); EventHandler mockHandler = mock(EventHandler.class);
@ -81,6 +82,46 @@ public class TestTaskHeartbeatHandler {
} }
} }
@Test
@SuppressWarnings("unchecked")
public void testTaskTimeoutDisable() throws InterruptedException {
EventHandler mockHandler = mock(EventHandler.class);
Clock clock = SystemClock.getInstance();
TaskHeartbeatHandler hb = new TaskHeartbeatHandler(mockHandler, clock, 1);
Configuration conf = new Configuration();
conf.setLong(MRJobConfig.TASK_STUCK_TIMEOUT_MS, 0); // no timeout
conf.setInt(MRJobConfig.TASK_TIMEOUT, 0); // no timeout
// set TASK_PROGRESS_REPORT_INTERVAL to a value smaller than TASK_TIMEOUT
// so that TASK_TIMEOUT is not overridden
conf.setLong(MRJobConfig.TASK_PROGRESS_REPORT_INTERVAL, 0);
conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 10); //10 ms
hb.init(conf);
hb.start();
try {
ApplicationId appId = ApplicationId.newInstance(0L, 5);
JobId jobId = MRBuilderUtils.newJobId(appId, 4);
TaskId tid = MRBuilderUtils.newTaskId(jobId, 3, TaskType.MAP);
TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 2);
hb.register(taid);
ConcurrentMap<TaskAttemptId, TaskHeartbeatHandler.ReportTime>
runningAttempts = hb.getRunningAttempts();
for (Map.Entry<TaskAttemptId, TaskHeartbeatHandler.ReportTime> entry
: runningAttempts.entrySet()) {
assertFalse(entry.getValue().isReported());
}
Thread.sleep(100);
// Timeout is disabled, so the task should not be canceled
verify(mockHandler, never()).handle(any(Event.class));
} finally {
hb.stop();
}
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Test @Test
public void testTaskStuck() throws InterruptedException { public void testTaskStuck() throws InterruptedException {

View File

@ -282,6 +282,7 @@
<description>The max timeout before receiving remote task's first heartbeat. <description>The max timeout before receiving remote task's first heartbeat.
This parameter is in order to avoid waiting for the container This parameter is in order to avoid waiting for the container
to start indefinitely, which made task stuck in the NEW state. to start indefinitely, which made task stuck in the NEW state.
A value of 0 disables the timeout.
</description> </description>
</property> </property>