diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 7071e14efe5..2b2fbd0089d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -643,6 +643,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, private float reduceProgress; private float cleanupProgress; private boolean isUber = false; + private boolean finishJobWhenReducersDone; + private boolean completingJob = false; private Credentials jobCredentials; private Token jobToken; @@ -716,6 +718,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, this.maxFetchFailuresNotifications = conf.getInt( MRJobConfig.MAX_FETCH_FAILURES_NOTIFICATIONS, MRJobConfig.DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS); + this.finishJobWhenReducersDone = conf.getBoolean( + MRJobConfig.FINISH_JOB_WHEN_REDUCERS_DONE, + MRJobConfig.DEFAULT_FINISH_JOB_WHEN_REDUCERS_DONE); } protected StateMachine getStateMachine() { @@ -2018,7 +2023,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, TimeUnit.MILLISECONDS); return JobStateInternal.FAIL_WAIT; } - + + checkReadyForCompletionWhenAllReducersDone(job); + return job.checkReadyForCommit(); } @@ -2049,6 +2056,32 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, } job.metrics.killedTask(task); } + + /** Improvement: if all reducers have finished, we check if we have + restarted mappers that are still running. This can happen in a + situation when a node becomes UNHEALTHY and mappers are rescheduled. + See MAPREDUCE-6870 for details */ + private void checkReadyForCompletionWhenAllReducersDone(JobImpl job) { + if (job.finishJobWhenReducersDone) { + int totalReduces = job.getTotalReduces(); + int completedReduces = job.getCompletedReduces(); + + if (totalReduces > 0 && totalReduces == completedReduces + && !job.completingJob) { + + for (TaskId mapTaskId : job.mapTasks) { + MapTaskImpl task = (MapTaskImpl) job.tasks.get(mapTaskId); + if (!task.isFinished()) { + LOG.info("Killing map task " + task.getID()); + job.eventHandler.handle( + new TaskEvent(task.getID(), TaskEventType.T_KILL)); + } + } + + job.completingJob = true; + } + } + } } // Transition class for handling jobs with no tasks diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java index 6deae2a8e13..5369284b2fe 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java @@ -564,33 +564,13 @@ public class TestJobImpl { dispatcher.register(TaskAttemptEventType.class, taskAttemptEventHandler); // replace the tasks with spied versions to return the right attempts - Map spiedTasks = new HashMap(); - List nodeReports = new ArrayList(); - Map nodeReportsToTaskIds = - new HashMap(); - for (Map.Entry e: job.tasks.entrySet()) { - TaskId taskId = e.getKey(); - Task task = e.getValue(); - if (taskId.getTaskType() == TaskType.MAP) { - // add an attempt to the task to simulate nodes - NodeId nodeId = mock(NodeId.class); - TaskAttempt attempt = mock(TaskAttempt.class); - when(attempt.getNodeId()).thenReturn(nodeId); - TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); - when(attempt.getID()).thenReturn(attemptId); - // create a spied task - Task spied = spy(task); - doReturn(attempt).when(spied).getAttempt(any(TaskAttemptId.class)); - spiedTasks.put(taskId, spied); + Map spiedTasks = new HashMap<>(); + List nodeReports = new ArrayList<>(); + Map nodeReportsToTaskIds = new HashMap<>(); + + createSpiedMapTasks(nodeReportsToTaskIds, spiedTasks, job, + NodeState.UNHEALTHY, nodeReports); - // create a NodeReport based on the node id - NodeReport report = mock(NodeReport.class); - when(report.getNodeState()).thenReturn(NodeState.UNHEALTHY); - when(report.getNodeId()).thenReturn(nodeId); - nodeReports.add(report); - nodeReportsToTaskIds.put(report, taskId); - } - } // replace the tasks with the spied tasks job.tasks.putAll(spiedTasks); @@ -641,6 +621,82 @@ public class TestJobImpl { commitHandler.stop(); } + @Test + public void testJobNCompletedWhenAllReducersAreFinished() + throws Exception { + testJobCompletionWhenReducersAreFinished(true); + } + + @Test + public void testJobNotCompletedWhenAllReducersAreFinished() + throws Exception { + testJobCompletionWhenReducersAreFinished(false); + } + + private void testJobCompletionWhenReducersAreFinished(boolean killMappers) + throws InterruptedException, BrokenBarrierException { + Configuration conf = new Configuration(); + conf.setBoolean(MRJobConfig.FINISH_JOB_WHEN_REDUCERS_DONE, killMappers); + conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); + conf.setInt(MRJobConfig.NUM_REDUCES, 1); + DrainDispatcher dispatcher = new DrainDispatcher(); + dispatcher.init(conf); + final List killedEvents = + Collections.synchronizedList(new ArrayList()); + dispatcher.register(TaskEventType.class, new EventHandler() { + @Override + public void handle(TaskEvent event) { + if (event.getType() == TaskEventType.T_KILL) { + killedEvents.add(event); + } + } + }); + dispatcher.start(); + CyclicBarrier syncBarrier = new CyclicBarrier(2); + OutputCommitter committer = new TestingOutputCommitter(syncBarrier, true); + CommitterEventHandler commitHandler = + createCommitterEventHandler(dispatcher, committer); + commitHandler.init(conf); + commitHandler.start(); + + final JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null); + + // replace the tasks with spied versions to return the right attempts + Map spiedTasks = new HashMap<>(); + List nodeReports = new ArrayList<>(); + Map nodeReportsToTaskIds = new HashMap<>(); + + createSpiedMapTasks(nodeReportsToTaskIds, spiedTasks, job, + NodeState.RUNNING, nodeReports); + + // replace the tasks with the spied tasks + job.tasks.putAll(spiedTasks); + + // finish reducer + for (TaskId taskId: job.tasks.keySet()) { + if (taskId.getTaskType() == TaskType.REDUCE) { + job.handle(new JobTaskEvent(taskId, TaskState.SUCCEEDED)); + } + } + + dispatcher.await(); + + /* + * StubbedJob cannot finish in this test - we'd have to generate the + * necessary events in this test manually, but that wouldn't add too + * much value. Instead, we validate the T_KILL events. + */ + if (killMappers) { + Assert.assertEquals("Number of killed events", 2, killedEvents.size()); + Assert.assertEquals("AttemptID", "task_1234567890000_0001_m_000000", + killedEvents.get(0).getTaskID().toString()); + Assert.assertEquals("AttemptID", "task_1234567890000_0001_m_000001", + killedEvents.get(1).getTaskID().toString()); + } else { + Assert.assertEquals("Number of killed events", 0, killedEvents.size()); + } + } + public static void main(String[] args) throws Exception { TestJobImpl t = new TestJobImpl(); t.testJobNoTasks(); @@ -1021,6 +1077,37 @@ public class TestJobImpl { Assert.assertEquals(state, job.getInternalState()); } + private void createSpiedMapTasks(Map + nodeReportsToTaskIds, Map spiedTasks, JobImpl job, + NodeState nodeState, List nodeReports) { + for (Map.Entry e: job.tasks.entrySet()) { + TaskId taskId = e.getKey(); + Task task = e.getValue(); + if (taskId.getTaskType() == TaskType.MAP) { + // add an attempt to the task to simulate nodes + NodeId nodeId = mock(NodeId.class); + TaskAttempt attempt = mock(TaskAttempt.class); + when(attempt.getNodeId()).thenReturn(nodeId); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + when(attempt.getID()).thenReturn(attemptId); + // create a spied task + Task spied = spy(task); + Map attemptMap = new HashMap<>(); + attemptMap.put(attemptId, attempt); + when(spied.getAttempts()).thenReturn(attemptMap); + doReturn(attempt).when(spied).getAttempt(any(TaskAttemptId.class)); + spiedTasks.put(taskId, spied); + + // create a NodeReport based on the node id + NodeReport report = mock(NodeReport.class); + when(report.getNodeState()).thenReturn(nodeState); + when(report.getNodeId()).thenReturn(nodeId); + nodeReports.add(report); + nodeReportsToTaskIds.put(report, taskId); + } + } + } + private static class JobSubmittedEventHandler implements EventHandler { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index ae371c4c1b4..d15d6db61bf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -412,7 +412,7 @@ public interface MRJobConfig { public static final String JOB_ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job"; public static final String DEFAULT_JOB_ACL_MODIFY_JOB = " "; - + public static final String JOB_RUNNING_MAP_LIMIT = "mapreduce.job.running.map.limit"; public static final int DEFAULT_JOB_RUNNING_MAP_LIMIT = 0; @@ -945,4 +945,8 @@ public interface MRJobConfig { * A comma-separated list of properties whose value will be redacted. */ String MR_JOB_REDACTED_PROPERTIES = "mapreduce.job.redacted-properties"; + + String FINISH_JOB_WHEN_REDUCERS_DONE = + "mapreduce.job.finish-when-all-reducers-done"; + boolean DEFAULT_FINISH_JOB_WHEN_REDUCERS_DONE = false; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index e5cd6605922..a0a969fa706 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -1122,6 +1122,14 @@ + + mapreduce.job.finish-when-all-reducers-done + false + Specifies whether the job should complete once all reducers + have finished, regardless of whether there are still running mappers. + + + mapreduce.job.token.tracking.ids.enabled false