diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 66a9aac59d3..2113c76e9d5 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -283,6 +283,9 @@ Release 2.8.0 - UNRELEASED MAPREDUCE-6433. launchTime may be negative. (Zhihai Xu) + MAPREDUCE-5817. Mappers get rescheduled on node transition even after all + reducers are completed. (Sangjin Lee via kasha) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 8623dc5a716..ab94960bc8b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -129,6 +129,7 @@ import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.Clock; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** Implementation of Job interface. Maintains the state machines of Job. @@ -1328,15 +1329,21 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, private void actOnUnusableNode(NodeId nodeId, NodeState nodeState) { // rerun previously successful map tasks - List taskAttemptIdList = nodesToSucceededTaskAttempts.get(nodeId); - if(taskAttemptIdList != null) { - String mesg = "TaskAttempt killed because it ran on unusable node " - + nodeId; - for(TaskAttemptId id : taskAttemptIdList) { - if(TaskType.MAP == id.getTaskId().getTaskType()) { - // reschedule only map tasks because their outputs maybe unusable - LOG.info(mesg + ". AttemptId:" + id); - eventHandler.handle(new TaskAttemptKillEvent(id, mesg)); + // do this only if the job is still in the running state and there are + // running reducers + if (getInternalState() == JobStateInternal.RUNNING && + !allReducersComplete()) { + List taskAttemptIdList = + nodesToSucceededTaskAttempts.get(nodeId); + if (taskAttemptIdList != null) { + String mesg = "TaskAttempt killed because it ran on unusable node " + + nodeId; + for (TaskAttemptId id : taskAttemptIdList) { + if (TaskType.MAP == id.getTaskId().getTaskType()) { + // reschedule only map tasks because their outputs maybe unusable + LOG.info(mesg + ". AttemptId:" + id); + eventHandler.handle(new TaskAttemptKillEvent(id, mesg)); + } } } } @@ -1344,6 +1351,10 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, // RMContainerAllocator } + private boolean allReducersComplete() { + return numReduceTasks == 0 || numReduceTasks == getCompletedReduces(); + } + /* private int getBlockSize() { String inputClassName = conf.get(MRJobConfig.INPUT_FORMAT_CLASS_ATTR); @@ -2082,13 +2093,18 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, } } + @VisibleForTesting + void decrementSucceededMapperCount() { + completedTaskCount--; + succeededMapTaskCount--; + } + private static class MapTaskRescheduledTransition implements SingleArcTransition { @Override public void transition(JobImpl job, JobEvent event) { //succeeded map task is restarted back - job.completedTaskCount--; - job.succeededMapTaskCount--; + job.decrementSucceededMapperCount(); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java index cae9663e8cc..2af4380821b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java @@ -19,15 +19,21 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; @@ -35,10 +41,6 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.jobhistory.EventType; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; -import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobStatus.State; import org.apache.hadoop.mapreduce.MRConfig; @@ -46,10 +48,17 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.jobhistory.EventType; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; +import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; @@ -64,7 +73,11 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; @@ -76,6 +89,9 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; @@ -508,6 +524,112 @@ public class TestJobImpl { commitHandler.stop(); } + @Test(timeout=20000) + public void testUnusableNodeTransition() throws Exception { + Configuration conf = new Configuration(); + conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); + conf.setInt(MRJobConfig.NUM_REDUCES, 1); + AsyncDispatcher dispatcher = new AsyncDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + CyclicBarrier syncBarrier = new CyclicBarrier(2); + OutputCommitter committer = new TestingOutputCommitter(syncBarrier, true); + CommitterEventHandler commitHandler = + createCommitterEventHandler(dispatcher, committer); + commitHandler.init(conf); + commitHandler.start(); + + final JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null); + // add a special task event handler to put the task back to running in case + // of task rescheduling/killing + EventHandler taskAttemptEventHandler = + new EventHandler() { + @Override + public void handle(TaskAttemptEvent event) { + if (event.getType() == TaskAttemptEventType.TA_KILL) { + job.decrementSucceededMapperCount(); + } + } + }; + dispatcher.register(TaskAttemptEventType.class, taskAttemptEventHandler); + + // replace the tasks with spied versions to return the right attempts + Map spiedTasks = new HashMap(); + List nodeReports = new ArrayList(); + Map nodeReportsToTaskIds = + new HashMap(); + for (Map.Entry e: job.tasks.entrySet()) { + TaskId taskId = e.getKey(); + Task task = e.getValue(); + if (taskId.getTaskType() == TaskType.MAP) { + // add an attempt to the task to simulate nodes + NodeId nodeId = mock(NodeId.class); + TaskAttempt attempt = mock(TaskAttempt.class); + when(attempt.getNodeId()).thenReturn(nodeId); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + when(attempt.getID()).thenReturn(attemptId); + // create a spied task + Task spied = spy(task); + doReturn(attempt).when(spied).getAttempt(any(TaskAttemptId.class)); + spiedTasks.put(taskId, spied); + + // create a NodeReport based on the node id + NodeReport report = mock(NodeReport.class); + when(report.getNodeState()).thenReturn(NodeState.UNHEALTHY); + when(report.getNodeId()).thenReturn(nodeId); + nodeReports.add(report); + nodeReportsToTaskIds.put(report, taskId); + } + } + // replace the tasks with the spied tasks + job.tasks.putAll(spiedTasks); + + // complete all mappers first + for (TaskId taskId: job.tasks.keySet()) { + if (taskId.getTaskType() == TaskType.MAP) { + // generate a task attempt completed event first to populate the + // nodes-to-succeeded-attempts map + TaskAttemptCompletionEvent tce = + Records.newRecord(TaskAttemptCompletionEvent.class); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + tce.setAttemptId(attemptId); + tce.setStatus(TaskAttemptCompletionEventStatus.SUCCEEDED); + job.handle(new JobTaskAttemptCompletedEvent(tce)); + // complete the task itself + job.handle(new JobTaskEvent(taskId, TaskState.SUCCEEDED)); + Assert.assertEquals(JobState.RUNNING, job.getState()); + } + } + + // add an event for a node transition + NodeReport firstMapperNodeReport = nodeReports.get(0); + NodeReport secondMapperNodeReport = nodeReports.get(1); + job.handle(new JobUpdatedNodesEvent(job.getID(), + Collections.singletonList(firstMapperNodeReport))); + // complete the reducer + for (TaskId taskId: job.tasks.keySet()) { + if (taskId.getTaskType() == TaskType.REDUCE) { + job.handle(new JobTaskEvent(taskId, TaskState.SUCCEEDED)); + } + } + // add another event for a node transition for the other mapper + // this should not trigger rescheduling + job.handle(new JobUpdatedNodesEvent(job.getID(), + Collections.singletonList(secondMapperNodeReport))); + // complete the first mapper that was rescheduled + TaskId firstMapper = nodeReportsToTaskIds.get(firstMapperNodeReport); + job.handle(new JobTaskEvent(firstMapper, TaskState.SUCCEEDED)); + // verify the state is moving to committing + assertJobState(job, JobStateInternal.COMMITTING); + + // let the committer complete and verify the job succeeds + syncBarrier.await(); + assertJobState(job, JobStateInternal.SUCCEEDED); + + dispatcher.stop(); + commitHandler.stop(); + } + public static void main(String[] args) throws Exception { TestJobImpl t = new TestJobImpl(); t.testJobNoTasks();