diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java index 456f2a66c8f..9439a7be8d6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java @@ -192,7 +192,8 @@ public class TaskHeartbeatHandler extends AbstractService { (currentTime > (entry.getValue().getLastProgress() + taskTimeOut)); // when container in NM not started in a long time, // we think the taskAttempt is stuck - boolean taskStuck = (!entry.getValue().isReported()) && + boolean taskStuck = (taskStuckTimeOut > 0) && + (!entry.getValue().isReported()) && (currentTime > (entry.getValue().getLastProgress() + taskStuckTimeOut)); @@ -225,7 +226,7 @@ public class TaskHeartbeatHandler extends AbstractService { } @VisibleForTesting - ConcurrentMap getRunningAttempts(){ + ConcurrentMap getRunningAttempts(){ return runningAttempts; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java index f0368ebe871..418f09e8d35 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java @@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.app; import static org.junit.Assert.assertFalse; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -48,7 +49,7 @@ import java.util.concurrent.ConcurrentMap; public class TestTaskHeartbeatHandler { - @SuppressWarnings({ "rawtypes", "unchecked" }) + @SuppressWarnings("unchecked") @Test public void testTaskTimeout() throws InterruptedException { EventHandler mockHandler = mock(EventHandler.class); @@ -81,6 +82,46 @@ public class TestTaskHeartbeatHandler { } } + @Test + @SuppressWarnings("unchecked") + public void testTaskTimeoutDisable() throws InterruptedException { + EventHandler mockHandler = mock(EventHandler.class); + Clock clock = SystemClock.getInstance(); + TaskHeartbeatHandler hb = new TaskHeartbeatHandler(mockHandler, clock, 1); + + Configuration conf = new Configuration(); + conf.setLong(MRJobConfig.TASK_STUCK_TIMEOUT_MS, 0); // no timeout + conf.setInt(MRJobConfig.TASK_TIMEOUT, 0); // no timeout + // set TASK_PROGRESS_REPORT_INTERVAL to a value smaller than TASK_TIMEOUT + // so that TASK_TIMEOUT is not overridden + conf.setLong(MRJobConfig.TASK_PROGRESS_REPORT_INTERVAL, 0); + conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 10); //10 ms + + hb.init(conf); + hb.start(); + try { + ApplicationId appId = ApplicationId.newInstance(0L, 5); + JobId jobId = MRBuilderUtils.newJobId(appId, 4); + TaskId tid = MRBuilderUtils.newTaskId(jobId, 3, TaskType.MAP); + TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 2); + hb.register(taid); + + ConcurrentMap + runningAttempts = hb.getRunningAttempts(); + for (Map.Entry entry + : runningAttempts.entrySet()) { + assertFalse(entry.getValue().isReported()); + } + + Thread.sleep(100); + + // Timeout is disabled, so the task should not be canceled + verify(mockHandler, never()).handle(any(Event.class)); + } finally { + hb.stop(); + } + } + @SuppressWarnings("unchecked") @Test public void testTaskStuck() throws InterruptedException { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index fa26e4d7382..1ba82d2b256 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -282,6 +282,7 @@ The max timeout before receiving remote task's first heartbeat. This parameter is in order to avoid waiting for the container to start indefinitely, which made task stuck in the NEW state. + A value of 0 disables the timeout.