MAPREDUCE-6870. Add configuration for MR job to finish when all reducers are complete. (Peter Bacsko via Haibo Chen)

This commit is contained in:
Haibo Chen 2017-08-10 15:17:36 -07:00
parent 312e57b954
commit a32e0138fb
4 changed files with 160 additions and 28 deletions

View File

@ -644,6 +644,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private float reduceProgress;
private float cleanupProgress;
private boolean isUber = false;
private boolean finishJobWhenReducersDone;
private boolean completingJob = false;
private Credentials jobCredentials;
private Token<JobTokenIdentifier> jobToken;
@ -717,6 +719,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
this.maxFetchFailuresNotifications = conf.getInt(
MRJobConfig.MAX_FETCH_FAILURES_NOTIFICATIONS,
MRJobConfig.DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS);
this.finishJobWhenReducersDone = conf.getBoolean(
MRJobConfig.FINISH_JOB_WHEN_REDUCERS_DONE,
MRJobConfig.DEFAULT_FINISH_JOB_WHEN_REDUCERS_DONE);
}
protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
@ -2021,7 +2026,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
TimeUnit.MILLISECONDS);
return JobStateInternal.FAIL_WAIT;
}
checkReadyForCompletionWhenAllReducersDone(job);
return job.checkReadyForCommit();
}
@ -2052,6 +2059,32 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
}
job.metrics.killedTask(task);
}
/** Improvement: if all reducers have finished, we check if we have
restarted mappers that are still running. This can happen in a
situation when a node becomes UNHEALTHY and mappers are rescheduled.
See MAPREDUCE-6870 for details */
private void checkReadyForCompletionWhenAllReducersDone(JobImpl job) {
if (job.finishJobWhenReducersDone) {
int totalReduces = job.getTotalReduces();
int completedReduces = job.getCompletedReduces();
if (totalReduces > 0 && totalReduces == completedReduces
&& !job.completingJob) {
for (TaskId mapTaskId : job.mapTasks) {
MapTaskImpl task = (MapTaskImpl) job.tasks.get(mapTaskId);
if (!task.isFinished()) {
LOG.info("Killing map task " + task.getID());
job.eventHandler.handle(
new TaskEvent(task.getID(), TaskEventType.T_KILL));
}
}
job.completingJob = true;
}
}
}
}
// Transition class for handling jobs with no tasks

View File

@ -564,33 +564,13 @@ public class TestJobImpl {
dispatcher.register(TaskAttemptEventType.class, taskAttemptEventHandler);
// replace the tasks with spied versions to return the right attempts
Map<TaskId,Task> spiedTasks = new HashMap<TaskId,Task>();
List<NodeReport> nodeReports = new ArrayList<NodeReport>();
Map<NodeReport,TaskId> nodeReportsToTaskIds =
new HashMap<NodeReport,TaskId>();
for (Map.Entry<TaskId,Task> e: job.tasks.entrySet()) {
TaskId taskId = e.getKey();
Task task = e.getValue();
if (taskId.getTaskType() == TaskType.MAP) {
// add an attempt to the task to simulate nodes
NodeId nodeId = mock(NodeId.class);
TaskAttempt attempt = mock(TaskAttempt.class);
when(attempt.getNodeId()).thenReturn(nodeId);
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
when(attempt.getID()).thenReturn(attemptId);
// create a spied task
Task spied = spy(task);
doReturn(attempt).when(spied).getAttempt(any(TaskAttemptId.class));
spiedTasks.put(taskId, spied);
Map<TaskId, Task> spiedTasks = new HashMap<>();
List<NodeReport> nodeReports = new ArrayList<>();
Map<NodeReport, TaskId> nodeReportsToTaskIds = new HashMap<>();
createSpiedMapTasks(nodeReportsToTaskIds, spiedTasks, job,
NodeState.UNHEALTHY, nodeReports);
// create a NodeReport based on the node id
NodeReport report = mock(NodeReport.class);
when(report.getNodeState()).thenReturn(NodeState.UNHEALTHY);
when(report.getNodeId()).thenReturn(nodeId);
nodeReports.add(report);
nodeReportsToTaskIds.put(report, taskId);
}
}
// replace the tasks with the spied tasks
job.tasks.putAll(spiedTasks);
@ -641,6 +621,82 @@ public class TestJobImpl {
commitHandler.stop();
}
@Test
public void testJobNCompletedWhenAllReducersAreFinished()
throws Exception {
testJobCompletionWhenReducersAreFinished(true);
}
@Test
public void testJobNotCompletedWhenAllReducersAreFinished()
throws Exception {
testJobCompletionWhenReducersAreFinished(false);
}
private void testJobCompletionWhenReducersAreFinished(boolean killMappers)
throws InterruptedException, BrokenBarrierException {
Configuration conf = new Configuration();
conf.setBoolean(MRJobConfig.FINISH_JOB_WHEN_REDUCERS_DONE, killMappers);
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
conf.setInt(MRJobConfig.NUM_REDUCES, 1);
DrainDispatcher dispatcher = new DrainDispatcher();
dispatcher.init(conf);
final List<TaskEvent> killedEvents =
Collections.synchronizedList(new ArrayList<TaskEvent>());
dispatcher.register(TaskEventType.class, new EventHandler<TaskEvent>() {
@Override
public void handle(TaskEvent event) {
if (event.getType() == TaskEventType.T_KILL) {
killedEvents.add(event);
}
}
});
dispatcher.start();
CyclicBarrier syncBarrier = new CyclicBarrier(2);
OutputCommitter committer = new TestingOutputCommitter(syncBarrier, true);
CommitterEventHandler commitHandler =
createCommitterEventHandler(dispatcher, committer);
commitHandler.init(conf);
commitHandler.start();
final JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
// replace the tasks with spied versions to return the right attempts
Map<TaskId, Task> spiedTasks = new HashMap<>();
List<NodeReport> nodeReports = new ArrayList<>();
Map<NodeReport, TaskId> nodeReportsToTaskIds = new HashMap<>();
createSpiedMapTasks(nodeReportsToTaskIds, spiedTasks, job,
NodeState.RUNNING, nodeReports);
// replace the tasks with the spied tasks
job.tasks.putAll(spiedTasks);
// finish reducer
for (TaskId taskId: job.tasks.keySet()) {
if (taskId.getTaskType() == TaskType.REDUCE) {
job.handle(new JobTaskEvent(taskId, TaskState.SUCCEEDED));
}
}
dispatcher.await();
/*
* StubbedJob cannot finish in this test - we'd have to generate the
* necessary events in this test manually, but that wouldn't add too
* much value. Instead, we validate the T_KILL events.
*/
if (killMappers) {
Assert.assertEquals("Number of killed events", 2, killedEvents.size());
Assert.assertEquals("AttemptID", "task_1234567890000_0001_m_000000",
killedEvents.get(0).getTaskID().toString());
Assert.assertEquals("AttemptID", "task_1234567890000_0001_m_000001",
killedEvents.get(1).getTaskID().toString());
} else {
Assert.assertEquals("Number of killed events", 0, killedEvents.size());
}
}
public static void main(String[] args) throws Exception {
TestJobImpl t = new TestJobImpl();
t.testJobNoTasks();
@ -1021,6 +1077,37 @@ public class TestJobImpl {
Assert.assertEquals(state, job.getInternalState());
}
private void createSpiedMapTasks(Map<NodeReport, TaskId>
nodeReportsToTaskIds, Map<TaskId, Task> spiedTasks, JobImpl job,
NodeState nodeState, List<NodeReport> nodeReports) {
for (Map.Entry<TaskId, Task> e: job.tasks.entrySet()) {
TaskId taskId = e.getKey();
Task task = e.getValue();
if (taskId.getTaskType() == TaskType.MAP) {
// add an attempt to the task to simulate nodes
NodeId nodeId = mock(NodeId.class);
TaskAttempt attempt = mock(TaskAttempt.class);
when(attempt.getNodeId()).thenReturn(nodeId);
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
when(attempt.getID()).thenReturn(attemptId);
// create a spied task
Task spied = spy(task);
Map<TaskAttemptId, TaskAttempt> attemptMap = new HashMap<>();
attemptMap.put(attemptId, attempt);
when(spied.getAttempts()).thenReturn(attemptMap);
doReturn(attempt).when(spied).getAttempt(any(TaskAttemptId.class));
spiedTasks.put(taskId, spied);
// create a NodeReport based on the node id
NodeReport report = mock(NodeReport.class);
when(report.getNodeState()).thenReturn(nodeState);
when(report.getNodeId()).thenReturn(nodeId);
nodeReports.add(report);
nodeReportsToTaskIds.put(report, taskId);
}
}
}
private static class JobSubmittedEventHandler implements
EventHandler<JobHistoryEvent> {

View File

@ -431,7 +431,7 @@ public interface MRJobConfig {
public static final String JOB_ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
public static final String DEFAULT_JOB_ACL_MODIFY_JOB = " ";
public static final String JOB_RUNNING_MAP_LIMIT =
"mapreduce.job.running.map.limit";
public static final int DEFAULT_JOB_RUNNING_MAP_LIMIT = 0;
@ -1033,4 +1033,8 @@ public interface MRJobConfig {
String MR_JOB_REDACTED_PROPERTIES = "mapreduce.job.redacted-properties";
String MR_JOB_SEND_TOKEN_CONF = "mapreduce.job.send-token-conf";
String FINISH_JOB_WHEN_REDUCERS_DONE =
"mapreduce.job.finish-when-all-reducers-done";
boolean DEFAULT_FINISH_JOB_WHEN_REDUCERS_DONE = true;
}

View File

@ -1125,6 +1125,14 @@
</description>
</property>
<property>
<name>mapreduce.job.finish-when-all-reducers-done</name>
<value>true</value>
<description>Specifies whether the job should complete once all reducers
have finished, regardless of whether there are still running mappers.
</description>
</property>
<property>
<name>mapreduce.job.token.tracking.ids.enabled</name>
<value>false</value>