MAPREDUCE-5817. Mappers get rescheduled on node transition even after all reducers are completed. (Sangjin Lee via kasha)

This commit is contained in:
Karthik Kambatla 2015-08-14 12:29:50 -07:00
parent 84bf71295a
commit 27d24f96ab
3 changed files with 156 additions and 15 deletions

View File

@ -552,6 +552,9 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6433. launchTime may be negative. (Zhihai Xu)
MAPREDUCE-5817. Mappers get rescheduled on node transition even after all
reducers are completed. (Sangjin Lee via kasha)
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -129,6 +129,7 @@ import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/** Implementation of Job interface. Maintains the state machines of Job.
@ -1330,15 +1331,21 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private void actOnUnusableNode(NodeId nodeId, NodeState nodeState) {
// rerun previously successful map tasks
List<TaskAttemptId> taskAttemptIdList = nodesToSucceededTaskAttempts.get(nodeId);
if(taskAttemptIdList != null) {
String mesg = "TaskAttempt killed because it ran on unusable node "
+ nodeId;
for(TaskAttemptId id : taskAttemptIdList) {
if(TaskType.MAP == id.getTaskId().getTaskType()) {
// reschedule only map tasks because their outputs maybe unusable
LOG.info(mesg + ". AttemptId:" + id);
eventHandler.handle(new TaskAttemptKillEvent(id, mesg));
// do this only if the job is still in the running state and there are
// running reducers
if (getInternalState() == JobStateInternal.RUNNING &&
!allReducersComplete()) {
List<TaskAttemptId> taskAttemptIdList =
nodesToSucceededTaskAttempts.get(nodeId);
if (taskAttemptIdList != null) {
String mesg = "TaskAttempt killed because it ran on unusable node "
+ nodeId;
for (TaskAttemptId id : taskAttemptIdList) {
if (TaskType.MAP == id.getTaskId().getTaskType()) {
// reschedule only map tasks because their outputs maybe unusable
LOG.info(mesg + ". AttemptId:" + id);
eventHandler.handle(new TaskAttemptKillEvent(id, mesg));
}
}
}
}
@ -1346,6 +1353,10 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
// RMContainerAllocator
}
private boolean allReducersComplete() {
return numReduceTasks == 0 || numReduceTasks == getCompletedReduces();
}
/*
private int getBlockSize() {
String inputClassName = conf.get(MRJobConfig.INPUT_FORMAT_CLASS_ATTR);
@ -2084,13 +2095,18 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
}
}
@VisibleForTesting
void decrementSucceededMapperCount() {
completedTaskCount--;
succeededMapTaskCount--;
}
private static class MapTaskRescheduledTransition implements
SingleArcTransition<JobImpl, JobEvent> {
@Override
public void transition(JobImpl job, JobEvent event) {
//succeeded map task is restarted back
job.completedTaskCount--;
job.succeededMapTaskCount--;
job.decrementSucceededMapperCount();
}
}

View File

@ -19,15 +19,21 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
@ -35,10 +41,6 @@ import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.jobhistory.EventType;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.mapreduce.MRConfig;
@ -46,10 +48,17 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.EventType;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
@ -64,7 +73,11 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
@ -76,6 +89,9 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
@ -508,6 +524,112 @@ public class TestJobImpl {
commitHandler.stop();
}
@Test(timeout=20000)
public void testUnusableNodeTransition() throws Exception {
Configuration conf = new Configuration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
conf.setInt(MRJobConfig.NUM_REDUCES, 1);
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
CyclicBarrier syncBarrier = new CyclicBarrier(2);
OutputCommitter committer = new TestingOutputCommitter(syncBarrier, true);
CommitterEventHandler commitHandler =
createCommitterEventHandler(dispatcher, committer);
commitHandler.init(conf);
commitHandler.start();
final JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
// add a special task event handler to put the task back to running in case
// of task rescheduling/killing
EventHandler<TaskAttemptEvent> taskAttemptEventHandler =
new EventHandler<TaskAttemptEvent>() {
@Override
public void handle(TaskAttemptEvent event) {
if (event.getType() == TaskAttemptEventType.TA_KILL) {
job.decrementSucceededMapperCount();
}
}
};
dispatcher.register(TaskAttemptEventType.class, taskAttemptEventHandler);
// replace the tasks with spied versions to return the right attempts
Map<TaskId,Task> spiedTasks = new HashMap<TaskId,Task>();
List<NodeReport> nodeReports = new ArrayList<NodeReport>();
Map<NodeReport,TaskId> nodeReportsToTaskIds =
new HashMap<NodeReport,TaskId>();
for (Map.Entry<TaskId,Task> e: job.tasks.entrySet()) {
TaskId taskId = e.getKey();
Task task = e.getValue();
if (taskId.getTaskType() == TaskType.MAP) {
// add an attempt to the task to simulate nodes
NodeId nodeId = mock(NodeId.class);
TaskAttempt attempt = mock(TaskAttempt.class);
when(attempt.getNodeId()).thenReturn(nodeId);
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
when(attempt.getID()).thenReturn(attemptId);
// create a spied task
Task spied = spy(task);
doReturn(attempt).when(spied).getAttempt(any(TaskAttemptId.class));
spiedTasks.put(taskId, spied);
// create a NodeReport based on the node id
NodeReport report = mock(NodeReport.class);
when(report.getNodeState()).thenReturn(NodeState.UNHEALTHY);
when(report.getNodeId()).thenReturn(nodeId);
nodeReports.add(report);
nodeReportsToTaskIds.put(report, taskId);
}
}
// replace the tasks with the spied tasks
job.tasks.putAll(spiedTasks);
// complete all mappers first
for (TaskId taskId: job.tasks.keySet()) {
if (taskId.getTaskType() == TaskType.MAP) {
// generate a task attempt completed event first to populate the
// nodes-to-succeeded-attempts map
TaskAttemptCompletionEvent tce =
Records.newRecord(TaskAttemptCompletionEvent.class);
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
tce.setAttemptId(attemptId);
tce.setStatus(TaskAttemptCompletionEventStatus.SUCCEEDED);
job.handle(new JobTaskAttemptCompletedEvent(tce));
// complete the task itself
job.handle(new JobTaskEvent(taskId, TaskState.SUCCEEDED));
Assert.assertEquals(JobState.RUNNING, job.getState());
}
}
// add an event for a node transition
NodeReport firstMapperNodeReport = nodeReports.get(0);
NodeReport secondMapperNodeReport = nodeReports.get(1);
job.handle(new JobUpdatedNodesEvent(job.getID(),
Collections.singletonList(firstMapperNodeReport)));
// complete the reducer
for (TaskId taskId: job.tasks.keySet()) {
if (taskId.getTaskType() == TaskType.REDUCE) {
job.handle(new JobTaskEvent(taskId, TaskState.SUCCEEDED));
}
}
// add another event for a node transition for the other mapper
// this should not trigger rescheduling
job.handle(new JobUpdatedNodesEvent(job.getID(),
Collections.singletonList(secondMapperNodeReport)));
// complete the first mapper that was rescheduled
TaskId firstMapper = nodeReportsToTaskIds.get(firstMapperNodeReport);
job.handle(new JobTaskEvent(firstMapper, TaskState.SUCCEEDED));
// verify the state is moving to committing
assertJobState(job, JobStateInternal.COMMITTING);
// let the committer complete and verify the job succeeds
syncBarrier.await();
assertJobState(job, JobStateInternal.SUCCEEDED);
dispatcher.stop();
commitHandler.stop();
}
public static void main(String[] args) throws Exception {
TestJobImpl t = new TestJobImpl();
t.testJobNoTasks();