MAPREDUCE-6937. Backport MAPREDUCE-6870 to branch-2 while preserving compatibility. (Peter Bacsko via Haibo Chen)
This commit is contained in:
parent
8e675d93df
commit
3f5c67df77
|
@ -643,6 +643,11 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
private float reduceProgress;
|
||||
private float cleanupProgress;
|
||||
private boolean isUber = false;
|
||||
/** Whether the job should be finished when all reducers are completed,
|
||||
regardless of having running mappers */
|
||||
private boolean finishJobWhenReducersDone;
|
||||
/** True if the job's mappers were already sent the T_KILL event */
|
||||
private boolean completingJob = false;
|
||||
|
||||
private Credentials jobCredentials;
|
||||
private Token<JobTokenIdentifier> jobToken;
|
||||
|
@ -714,6 +719,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
this.maxFetchFailuresNotifications = conf.getInt(
|
||||
MRJobConfig.MAX_FETCH_FAILURES_NOTIFICATIONS,
|
||||
MRJobConfig.DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS);
|
||||
this.finishJobWhenReducersDone = conf.getBoolean(
|
||||
MRJobConfig.FINISH_JOB_WHEN_REDUCERS_DONE,
|
||||
MRJobConfig.DEFAULT_FINISH_JOB_WHEN_REDUCERS_DONE);
|
||||
}
|
||||
|
||||
protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
|
||||
|
@ -2014,7 +2022,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
TimeUnit.MILLISECONDS);
|
||||
return JobStateInternal.FAIL_WAIT;
|
||||
}
|
||||
|
||||
|
||||
checkReadyForCompletionWhenAllReducersDone(job);
|
||||
|
||||
return job.checkReadyForCommit();
|
||||
}
|
||||
|
||||
|
@ -2045,6 +2055,34 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
}
|
||||
job.metrics.killedTask(task);
|
||||
}
|
||||
|
||||
/** Improvement: if all reducers have finished, we check if we have
|
||||
restarted mappers that are still running. This can happen in a
|
||||
situation when a node becomes UNHEALTHY and mappers are rescheduled.
|
||||
See MAPREDUCE-6870 for details
|
||||
@param job The MapReduce job
|
||||
*/
|
||||
private void checkReadyForCompletionWhenAllReducersDone(final JobImpl job) {
|
||||
if (job.finishJobWhenReducersDone) {
|
||||
int totalReduces = job.getTotalReduces();
|
||||
int completedReduces = job.getCompletedReduces();
|
||||
|
||||
if (totalReduces > 0 && totalReduces == completedReduces
|
||||
&& !job.completingJob) {
|
||||
|
||||
for (TaskId mapTaskId : job.mapTasks) {
|
||||
MapTaskImpl task = (MapTaskImpl) job.tasks.get(mapTaskId);
|
||||
if (!task.isFinished()) {
|
||||
LOG.info("Killing map task " + task.getID());
|
||||
job.eventHandler.handle(
|
||||
new TaskEvent(task.getID(), TaskEventType.T_KILL));
|
||||
}
|
||||
}
|
||||
|
||||
job.completingJob = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Transition class for handling jobs with no tasks
|
||||
|
|
|
@ -562,33 +562,13 @@ public class TestJobImpl {
|
|||
dispatcher.register(TaskAttemptEventType.class, taskAttemptEventHandler);
|
||||
|
||||
// replace the tasks with spied versions to return the right attempts
|
||||
Map<TaskId,Task> spiedTasks = new HashMap<TaskId,Task>();
|
||||
List<NodeReport> nodeReports = new ArrayList<NodeReport>();
|
||||
Map<NodeReport,TaskId> nodeReportsToTaskIds =
|
||||
new HashMap<NodeReport,TaskId>();
|
||||
for (Map.Entry<TaskId,Task> e: job.tasks.entrySet()) {
|
||||
TaskId taskId = e.getKey();
|
||||
Task task = e.getValue();
|
||||
if (taskId.getTaskType() == TaskType.MAP) {
|
||||
// add an attempt to the task to simulate nodes
|
||||
NodeId nodeId = mock(NodeId.class);
|
||||
TaskAttempt attempt = mock(TaskAttempt.class);
|
||||
when(attempt.getNodeId()).thenReturn(nodeId);
|
||||
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
|
||||
when(attempt.getID()).thenReturn(attemptId);
|
||||
// create a spied task
|
||||
Task spied = spy(task);
|
||||
doReturn(attempt).when(spied).getAttempt(any(TaskAttemptId.class));
|
||||
spiedTasks.put(taskId, spied);
|
||||
Map<TaskId, Task> spiedTasks = new HashMap<>();
|
||||
List<NodeReport> nodeReports = new ArrayList<>();
|
||||
Map<NodeReport, TaskId> nodeReportsToTaskIds = new HashMap<>();
|
||||
|
||||
createSpiedMapTasks(nodeReportsToTaskIds, spiedTasks, job,
|
||||
NodeState.UNHEALTHY, nodeReports);
|
||||
|
||||
// create a NodeReport based on the node id
|
||||
NodeReport report = mock(NodeReport.class);
|
||||
when(report.getNodeState()).thenReturn(NodeState.UNHEALTHY);
|
||||
when(report.getNodeId()).thenReturn(nodeId);
|
||||
nodeReports.add(report);
|
||||
nodeReportsToTaskIds.put(report, taskId);
|
||||
}
|
||||
}
|
||||
// replace the tasks with the spied tasks
|
||||
job.tasks.putAll(spiedTasks);
|
||||
|
||||
|
@ -638,6 +618,82 @@ public class TestJobImpl {
|
|||
commitHandler.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJobNCompletedWhenAllReducersAreFinished()
|
||||
throws Exception {
|
||||
testJobCompletionWhenReducersAreFinished(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJobNotCompletedWhenAllReducersAreFinished()
|
||||
throws Exception {
|
||||
testJobCompletionWhenReducersAreFinished(false);
|
||||
}
|
||||
|
||||
private void testJobCompletionWhenReducersAreFinished(boolean killMappers)
|
||||
throws InterruptedException, BrokenBarrierException {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setBoolean(MRJobConfig.FINISH_JOB_WHEN_REDUCERS_DONE, killMappers);
|
||||
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
|
||||
conf.setInt(MRJobConfig.NUM_REDUCES, 1);
|
||||
DrainDispatcher dispatcher = new DrainDispatcher();
|
||||
dispatcher.init(conf);
|
||||
final List<TaskEvent> killedEvents =
|
||||
Collections.synchronizedList(new ArrayList<TaskEvent>());
|
||||
dispatcher.register(TaskEventType.class, new EventHandler<TaskEvent>() {
|
||||
@Override
|
||||
public void handle(TaskEvent event) {
|
||||
if (event.getType() == TaskEventType.T_KILL) {
|
||||
killedEvents.add(event);
|
||||
}
|
||||
}
|
||||
});
|
||||
dispatcher.start();
|
||||
CyclicBarrier syncBarrier = new CyclicBarrier(2);
|
||||
OutputCommitter committer = new TestingOutputCommitter(syncBarrier, true);
|
||||
CommitterEventHandler commitHandler =
|
||||
createCommitterEventHandler(dispatcher, committer);
|
||||
commitHandler.init(conf);
|
||||
commitHandler.start();
|
||||
|
||||
final JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
|
||||
|
||||
// replace the tasks with spied versions to return the right attempts
|
||||
Map<TaskId, Task> spiedTasks = new HashMap<>();
|
||||
List<NodeReport> nodeReports = new ArrayList<>();
|
||||
Map<NodeReport, TaskId> nodeReportsToTaskIds = new HashMap<>();
|
||||
|
||||
createSpiedMapTasks(nodeReportsToTaskIds, spiedTasks, job,
|
||||
NodeState.RUNNING, nodeReports);
|
||||
|
||||
// replace the tasks with the spied tasks
|
||||
job.tasks.putAll(spiedTasks);
|
||||
|
||||
// finish reducer
|
||||
for (TaskId taskId: job.tasks.keySet()) {
|
||||
if (taskId.getTaskType() == TaskType.REDUCE) {
|
||||
job.handle(new JobTaskEvent(taskId, TaskState.SUCCEEDED));
|
||||
}
|
||||
}
|
||||
|
||||
dispatcher.await();
|
||||
|
||||
/*
|
||||
* StubbedJob cannot finish in this test - we'd have to generate the
|
||||
* necessary events in this test manually, but that wouldn't add too
|
||||
* much value. Instead, we validate the T_KILL events.
|
||||
*/
|
||||
if (killMappers) {
|
||||
Assert.assertEquals("Number of killed events", 2, killedEvents.size());
|
||||
Assert.assertEquals("AttemptID", "task_1234567890000_0001_m_000000",
|
||||
killedEvents.get(0).getTaskID().toString());
|
||||
Assert.assertEquals("AttemptID", "task_1234567890000_0001_m_000001",
|
||||
killedEvents.get(1).getTaskID().toString());
|
||||
} else {
|
||||
Assert.assertEquals("Number of killed events", 0, killedEvents.size());
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
TestJobImpl t = new TestJobImpl();
|
||||
t.testJobNoTasks();
|
||||
|
@ -985,6 +1041,37 @@ public class TestJobImpl {
|
|||
Assert.assertEquals(state, job.getInternalState());
|
||||
}
|
||||
|
||||
private void createSpiedMapTasks(Map<NodeReport, TaskId>
|
||||
nodeReportsToTaskIds, Map<TaskId, Task> spiedTasks, JobImpl job,
|
||||
NodeState nodeState, List<NodeReport> nodeReports) {
|
||||
for (Map.Entry<TaskId, Task> e: job.tasks.entrySet()) {
|
||||
TaskId taskId = e.getKey();
|
||||
Task task = e.getValue();
|
||||
if (taskId.getTaskType() == TaskType.MAP) {
|
||||
// add an attempt to the task to simulate nodes
|
||||
NodeId nodeId = mock(NodeId.class);
|
||||
TaskAttempt attempt = mock(TaskAttempt.class);
|
||||
when(attempt.getNodeId()).thenReturn(nodeId);
|
||||
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
|
||||
when(attempt.getID()).thenReturn(attemptId);
|
||||
// create a spied task
|
||||
Task spied = spy(task);
|
||||
Map<TaskAttemptId, TaskAttempt> attemptMap = new HashMap<>();
|
||||
attemptMap.put(attemptId, attempt);
|
||||
when(spied.getAttempts()).thenReturn(attemptMap);
|
||||
doReturn(attempt).when(spied).getAttempt(any(TaskAttemptId.class));
|
||||
spiedTasks.put(taskId, spied);
|
||||
|
||||
// create a NodeReport based on the node id
|
||||
NodeReport report = mock(NodeReport.class);
|
||||
when(report.getNodeState()).thenReturn(nodeState);
|
||||
when(report.getNodeId()).thenReturn(nodeId);
|
||||
nodeReports.add(report);
|
||||
nodeReportsToTaskIds.put(report, taskId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class JobSubmittedEventHandler implements
|
||||
EventHandler<JobHistoryEvent> {
|
||||
|
||||
|
|
|
@ -387,7 +387,7 @@ public interface MRJobConfig {
|
|||
public static final String JOB_ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
|
||||
|
||||
public static final String DEFAULT_JOB_ACL_MODIFY_JOB = " ";
|
||||
|
||||
|
||||
public static final String JOB_RUNNING_MAP_LIMIT =
|
||||
"mapreduce.job.running.map.limit";
|
||||
public static final int DEFAULT_JOB_RUNNING_MAP_LIMIT = 0;
|
||||
|
@ -920,4 +920,13 @@ public interface MRJobConfig {
|
|||
* A comma-separated list of properties whose value will be redacted.
|
||||
*/
|
||||
String MR_JOB_REDACTED_PROPERTIES = "mapreduce.job.redacted-properties";
|
||||
|
||||
/**
|
||||
* Specifies whether the job should complete once all reducers
|
||||
* have finished, regardless of whether there are still running mappers.
|
||||
*/
|
||||
String FINISH_JOB_WHEN_REDUCERS_DONE =
|
||||
"mapreduce.job.finish-when-all-reducers-done";
|
||||
|
||||
boolean DEFAULT_FINISH_JOB_WHEN_REDUCERS_DONE = false;
|
||||
}
|
||||
|
|
|
@ -1461,6 +1461,14 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>mapreduce.job.finish-when-all-reducers-done</name>
|
||||
<value>false</value>
|
||||
<description>Specifies whether the job should complete once all reducers
|
||||
have finished, regardless of whether there are still running mappers.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>mapreduce.job.token.tracking.ids.enabled</name>
|
||||
<value>false</value>
|
||||
|
|
Loading…
Reference in New Issue