diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index a9fcbe2fc3b..8151286e871 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -342,9 +342,15 @@ public class TaskAttemptListenerImpl extends CompositeService AtomicReference lastStatusRef = attemptIdToStatus.get(yarnAttemptID); if (lastStatusRef == null) { - LOG.error("Status update was called with illegal TaskAttemptId: " - + yarnAttemptID); - return false; + // The task is not known, but it could be in the process of tearing + // down gracefully or receiving a thread dump signal. Tolerate unknown + // tasks as long as they have unregistered recently. + if (!taskHeartbeatHandler.hasRecentlyUnregistered(yarnAttemptID)) { + LOG.error("Status update was called with illegal TaskAttemptId: " + + yarnAttemptID); + return false; + } + return true; } taskHeartbeatHandler.progressing(yarnAttemptID); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java index 6a716c7332e..a05960ce4aa 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java @@ -70,12 +70,14 @@ public class TaskHeartbeatHandler extends AbstractService { private Thread lostTaskCheckerThread; private volatile boolean stopped; private long taskTimeOut; + private long unregisterTimeOut; private int taskTimeOutCheckInterval = 30 * 1000; // 30 seconds. private final EventHandler eventHandler; private final Clock clock; private ConcurrentMap runningAttempts; + private ConcurrentMap recentlyUnregisteredAttempts; public TaskHeartbeatHandler(EventHandler eventHandler, Clock clock, int numThreads) { @@ -84,6 +86,8 @@ public class TaskHeartbeatHandler extends AbstractService { this.clock = clock; runningAttempts = new ConcurrentHashMap(16, 0.75f, numThreads); + recentlyUnregisteredAttempts = + new ConcurrentHashMap(16, 0.75f, numThreads); } @Override @@ -91,6 +95,8 @@ public class TaskHeartbeatHandler extends AbstractService { super.serviceInit(conf); taskTimeOut = conf.getLong( MRJobConfig.TASK_TIMEOUT, MRJobConfig.DEFAULT_TASK_TIMEOUT_MILLIS); + unregisterTimeOut = conf.getLong(MRJobConfig.TASK_EXIT_TIMEOUT, + MRJobConfig.TASK_EXIT_TIMEOUT_DEFAULT); // enforce task timeout is at least twice as long as task report interval long taskProgressReportIntervalMillis = MRJobConfUtil. @@ -139,6 +145,12 @@ public class TaskHeartbeatHandler extends AbstractService { public void unregister(TaskAttemptId attemptID) { runningAttempts.remove(attemptID); + recentlyUnregisteredAttempts.put(attemptID, + new ReportTime(clock.getTime())); + } + + public boolean hasRecentlyUnregistered(TaskAttemptId attemptID) { + return recentlyUnregisteredAttempts.containsKey(attemptID); } private class PingChecker implements Runnable { @@ -146,27 +158,9 @@ public class TaskHeartbeatHandler extends AbstractService { @Override public void run() { while (!stopped && !Thread.currentThread().isInterrupted()) { - Iterator> iterator = - runningAttempts.entrySet().iterator(); - - // avoid calculating current time everytime in loop long currentTime = clock.getTime(); - - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - boolean taskTimedOut = (taskTimeOut > 0) && - (currentTime > (entry.getValue().getLastProgress() + taskTimeOut)); - - if(taskTimedOut) { - // task is lost, remove from the list and raise lost event - iterator.remove(); - eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry - .getKey(), "AttemptID:" + entry.getKey().toString() - + " Timed out after " + taskTimeOut / 1000 + " secs")); - eventHandler.handle(new TaskAttemptEvent(entry.getKey(), - TaskAttemptEventType.TA_TIMED_OUT)); - } - } + checkRunning(currentTime); + checkRecentlyUnregistered(currentTime); try { Thread.sleep(taskTimeOutCheckInterval); } catch (InterruptedException e) { @@ -175,6 +169,39 @@ public class TaskHeartbeatHandler extends AbstractService { } } } + + private void checkRunning(long currentTime) { + Iterator> iterator = + runningAttempts.entrySet().iterator(); + + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + boolean taskTimedOut = (taskTimeOut > 0) && + (currentTime > (entry.getValue().getLastProgress() + taskTimeOut)); + + if(taskTimedOut) { + // task is lost, remove from the list and raise lost event + iterator.remove(); + eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry + .getKey(), "AttemptID:" + entry.getKey().toString() + + " Timed out after " + taskTimeOut / 1000 + " secs")); + eventHandler.handle(new TaskAttemptEvent(entry.getKey(), + TaskAttemptEventType.TA_TIMED_OUT)); + } + } + } + + private void checkRecentlyUnregistered(long currentTime) { + Iterator iterator = + recentlyUnregisteredAttempts.values().iterator(); + while (iterator.hasNext()) { + ReportTime unregisteredTime = iterator.next(); + if (currentTime > + unregisteredTime.getLastProgress() + unregisterTimeOut) { + iterator.remove(); + } + } + } } @VisibleForTesting diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java index 38cba8a9529..53a2ba00b3a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java @@ -17,12 +17,14 @@ */ package org.apache.hadoop.mapred; +import com.google.common.base.Supplier; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -35,6 +37,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; @@ -51,11 +54,13 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.SystemClock; import org.junit.After; import org.junit.Assert; @@ -398,14 +403,52 @@ public class TestTaskAttemptListenerImpl { } @Test - public void testStatusUpdateFromUnregisteredTask() - throws IOException, InterruptedException{ + public void testStatusUpdateFromUnregisteredTask() throws Exception { configureMocks(); - startListener(false); + ControlledClock clock = new ControlledClock(); + clock.setTime(0); + doReturn(clock).when(appCtx).getClock(); - boolean taskFound = listener.statusUpdate(attemptID, firstReduceStatus); + final TaskAttemptListenerImpl tal = new TaskAttemptListenerImpl(appCtx, + secret, rmHeartbeatHandler, null) { + @Override + protected void startRpcServer() { + // Empty + } + @Override + protected void stopRpcServer() { + // Empty + } + }; - assertFalse(taskFound); + Configuration conf = new Configuration(); + conf.setLong(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1); + tal.init(conf); + tal.start(); + + assertFalse(tal.statusUpdate(attemptID, firstReduceStatus)); + tal.registerPendingTask(task, wid); + tal.registerLaunchedTask(attemptId, wid); + assertTrue(tal.statusUpdate(attemptID, firstReduceStatus)); + + // verify attempt is still reported as found if recently unregistered + tal.unregister(attemptId, wid); + assertTrue(tal.statusUpdate(attemptID, firstReduceStatus)); + + // verify attempt is not found if not recently unregistered + long unregisterTimeout = conf.getLong(MRJobConfig.TASK_EXIT_TIMEOUT, + MRJobConfig.TASK_EXIT_TIMEOUT_DEFAULT); + clock.setTime(unregisterTimeout + 1); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + return !tal.statusUpdate(attemptID, firstReduceStatus); + } catch (Exception e) { + throw new RuntimeException("status update failed", e); + } + } + }, 10, 10000); } private void configureMocks() { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java index 26238494063..5d86479ef87 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import com.google.common.base.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.JobId; @@ -30,10 +31,12 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.SystemClock; import org.junit.Assert; import org.junit.Test; @@ -105,6 +108,41 @@ public class TestTaskHeartbeatHandler { verifyTaskTimeoutConfig(conf, expectedTimeout); } + @Test + public void testTaskUnregistered() throws Exception { + EventHandler mockHandler = mock(EventHandler.class); + ControlledClock clock = new ControlledClock(); + clock.setTime(0); + final TaskHeartbeatHandler hb = + new TaskHeartbeatHandler(mockHandler, clock, 1); + Configuration conf = new Configuration(); + conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1); + hb.init(conf); + hb.start(); + try { + ApplicationId appId = ApplicationId.newInstance(0l, 5); + JobId jobId = MRBuilderUtils.newJobId(appId, 4); + TaskId tid = MRBuilderUtils.newTaskId(jobId, 3, TaskType.MAP); + final TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 2); + Assert.assertFalse(hb.hasRecentlyUnregistered(taid)); + hb.register(taid); + Assert.assertFalse(hb.hasRecentlyUnregistered(taid)); + hb.unregister(taid); + Assert.assertTrue(hb.hasRecentlyUnregistered(taid)); + long unregisterTimeout = conf.getLong(MRJobConfig.TASK_EXIT_TIMEOUT, + MRJobConfig.TASK_EXIT_TIMEOUT_DEFAULT); + clock.setTime(unregisterTimeout + 1); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return !hb.hasRecentlyUnregistered(taid); + } + }, 10, 10000); + } finally { + hb.stop(); + } + } + /** * Test if task timeout is set properly in response to the configuration of * the task progress report interval.