diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 0984a92ddcf..b272af1fc0e 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -14,6 +14,9 @@ Release 2.0.1-alpha - UNRELEASED MAPREDUCE-3871. Allow symlinking in LocalJobRunner DistributedCache. (tomwhite) + MAPREDUCE-3921. MR AM should act on node health status changes. + (Bikas Saha via sseth) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java index e627128975c..af4aef7d9c0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java @@ -25,6 +25,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; /** @@ -54,6 +55,11 @@ public interface TaskAttempt { */ String getAssignedContainerMgrAddress(); + /** + * @return node's id if a container is assigned, otherwise null. + */ + NodeId getNodeId(); + /** * @return node's http address if a container is assigned, otherwise null. */ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java index e0223b13796..5accb6da3e9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java @@ -44,5 +44,9 @@ public enum JobEventType { JOB_COUNTER_UPDATE, //Producer:TaskAttemptListener - JOB_TASK_ATTEMPT_FETCH_FAILURE + JOB_TASK_ATTEMPT_FETCH_FAILURE, + + //Producer:RMContainerAllocator + JOB_UPDATED_NODES + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobUpdatedNodesEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobUpdatedNodesEvent.java new file mode 100644 index 00000000000..c332fb0e693 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobUpdatedNodesEvent.java @@ -0,0 +1,40 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2.app.job.event; + +import java.util.List; + +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.yarn.api.records.NodeReport; + + + +public class JobUpdatedNodesEvent extends JobEvent { + + private final List updatedNodes; + public JobUpdatedNodesEvent(JobId jobId, List updatedNodes) { + super(jobId, JobEventType.JOB_UPDATED_NODES); + this.updatedNodes = updatedNodes; + } + + public List getUpdatedNodes() { + return updatedNodes; + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptKillEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptKillEvent.java new file mode 100644 index 00000000000..9bcc838173e --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptKillEvent.java @@ -0,0 +1,37 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2.app.job.event; + +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; + + +public class TaskAttemptKillEvent extends TaskAttemptEvent { + + private final String message; + + public TaskAttemptKillEvent(TaskAttemptId attemptID, + String message) { + super(attemptID, TaskAttemptEventType.TA_KILL); + this.message = message; + } + + public String getMessage() { + return message; + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 220bbc31d6b..10eb68dbf85 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -77,6 +77,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; import org.apache.hadoop.mapreduce.v2.app.job.Task; +import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; @@ -85,8 +86,10 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; @@ -100,6 +103,9 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; import org.apache.hadoop.yarn.state.MultipleArcTransition; @@ -148,6 +154,12 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, private final Object tasksSyncHandle = new Object(); private final Set mapTasks = new LinkedHashSet(); private final Set reduceTasks = new LinkedHashSet(); + /** + * maps nodes to tasks that have run on those nodes + */ + private final HashMap> + nodesToSucceededTaskAttempts = new HashMap>(); + private final EventHandler eventHandler; private final MRAppMetrics metrics; private final String userName; @@ -194,6 +206,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, new TaskAttemptCompletedEventTransition(); private static final CounterUpdateTransition COUNTER_UPDATE_TRANSITION = new CounterUpdateTransition(); + private static final UpdatedNodesTransition UPDATED_NODES_TRANSITION = + new UpdatedNodesTransition(); protected static final StateMachineFactory @@ -218,7 +232,10 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, .addTransition(JobState.NEW, JobState.ERROR, JobEventType.INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) - + // Ignore-able events + .addTransition(JobState.NEW, JobState.NEW, + JobEventType.JOB_UPDATED_NODES) + // Transitions from INITED state .addTransition(JobState.INITED, JobState.INITED, JobEventType.JOB_DIAGNOSTIC_UPDATE, @@ -234,7 +251,10 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, .addTransition(JobState.INITED, JobState.ERROR, JobEventType.INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) - + // Ignore-able events + .addTransition(JobState.INITED, JobState.INITED, + JobEventType.JOB_UPDATED_NODES) + // Transitions from RUNNING state .addTransition(JobState.RUNNING, JobState.RUNNING, JobEventType.JOB_TASK_ATTEMPT_COMPLETED, @@ -251,6 +271,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, new JobNoTasksCompletedTransition()) .addTransition(JobState.RUNNING, JobState.KILL_WAIT, JobEventType.JOB_KILL, new KillTasksTransition()) + .addTransition(JobState.RUNNING, JobState.RUNNING, + JobEventType.JOB_UPDATED_NODES, + UPDATED_NODES_TRANSITION) .addTransition(JobState.RUNNING, JobState.RUNNING, JobEventType.JOB_MAP_TASK_RESCHEDULED, new MapTaskRescheduledTransition()) @@ -288,8 +311,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, // Ignore-able events .addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT, EnumSet.of(JobEventType.JOB_KILL, - JobEventType.JOB_MAP_TASK_RESCHEDULED, - JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE)) + JobEventType.JOB_UPDATED_NODES, + JobEventType.JOB_MAP_TASK_RESCHEDULED, + JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE)) // Transitions from SUCCEEDED state .addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED, @@ -303,7 +327,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, INTERNAL_ERROR_TRANSITION) // Ignore-able events .addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED, - EnumSet.of(JobEventType.JOB_KILL, + EnumSet.of(JobEventType.JOB_KILL, + JobEventType.JOB_UPDATED_NODES, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE)) // Transitions from FAILED state @@ -318,7 +343,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, INTERNAL_ERROR_TRANSITION) // Ignore-able events .addTransition(JobState.FAILED, JobState.FAILED, - EnumSet.of(JobEventType.JOB_KILL, + EnumSet.of(JobEventType.JOB_KILL, + JobEventType.JOB_UPDATED_NODES, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE)) // Transitions from KILLED state @@ -333,7 +359,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, INTERNAL_ERROR_TRANSITION) // Ignore-able events .addTransition(JobState.KILLED, JobState.KILLED, - EnumSet.of(JobEventType.JOB_KILL, + EnumSet.of(JobEventType.JOB_KILL, + JobEventType.JOB_UPDATED_NODES, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE)) // No transitions from INTERNAL_ERROR state. Ignore all. @@ -346,6 +373,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, JobEventType.JOB_TASK_ATTEMPT_COMPLETED, JobEventType.JOB_MAP_TASK_RESCHEDULED, JobEventType.JOB_DIAGNOSTIC_UPDATE, + JobEventType.JOB_UPDATED_NODES, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, JobEventType.INTERNAL_ERROR)) .addTransition(JobState.ERROR, JobState.ERROR, @@ -895,7 +923,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, LOG.info(msg.toString()); } } - + /** * ChainMapper and ChainReducer must execute in parallel, so they're not * compatible with uberization/LocalContainerLauncher (100% sequential). @@ -924,6 +952,24 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, } return isChainJob; } + + private void actOnUnusableNode(NodeId nodeId, NodeState nodeState) { + // rerun previously successful map tasks + List taskAttemptIdList = nodesToSucceededTaskAttempts.get(nodeId); + if(taskAttemptIdList != null) { + String mesg = "TaskAttempt killed because it ran on unusable node " + + nodeId; + for(TaskAttemptId id : taskAttemptIdList) { + if(TaskType.MAP == id.getTaskId().getTaskType()) { + // reschedule only map tasks because their outputs maybe unusable + LOG.info(mesg + ". AttemptId:" + id); + eventHandler.handle(new TaskAttemptKillEvent(id, mesg)); + } + } + } + // currently running task attempts on unusable nodes are handled in + // RMContainerAllocator + } /* private int getBlockSize() { @@ -1269,18 +1315,37 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, tce.setEventId(job.taskAttemptCompletionEvents.size()); job.taskAttemptCompletionEvents.add(tce); + TaskAttemptId attemptId = tce.getAttemptId(); + TaskId taskId = attemptId.getTaskId(); //make the previous completion event as obsolete if it exists Object successEventNo = - job.successAttemptCompletionEventNoMap.remove(tce.getAttemptId().getTaskId()); + job.successAttemptCompletionEventNoMap.remove(taskId); if (successEventNo != null) { TaskAttemptCompletionEvent successEvent = job.taskAttemptCompletionEvents.get((Integer) successEventNo); successEvent.setStatus(TaskAttemptCompletionEventStatus.OBSOLETE); } - + + // if this attempt is not successful then why is the previous successful + // attempt being removed above - MAPREDUCE-4330 if (TaskAttemptCompletionEventStatus.SUCCEEDED.equals(tce.getStatus())) { - job.successAttemptCompletionEventNoMap.put(tce.getAttemptId().getTaskId(), - tce.getEventId()); + job.successAttemptCompletionEventNoMap.put(taskId, tce.getEventId()); + + // here we could have simply called Task.getSuccessfulAttempt() but + // the event that triggers this code is sent before + // Task.successfulAttempt is set and so there is no guarantee that it + // will be available now + Task task = job.tasks.get(taskId); + TaskAttempt attempt = task.getAttempt(attemptId); + NodeId nodeId = attempt.getNodeId(); + assert (nodeId != null); // node must exist for a successful event + List taskAttemptIdList = job.nodesToSucceededTaskAttempts + .get(nodeId); + if (taskAttemptIdList == null) { + taskAttemptIdList = new ArrayList(); + job.nodesToSucceededTaskAttempts.put(nodeId, taskAttemptIdList); + } + taskAttemptIdList.add(attempt.getID()); } } } @@ -1460,7 +1525,22 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, } } } - + + private static class UpdatedNodesTransition implements + SingleArcTransition { + @Override + public void transition(JobImpl job, JobEvent event) { + JobUpdatedNodesEvent updateEvent = (JobUpdatedNodesEvent) event; + for(NodeReport nr: updateEvent.getUpdatedNodes()) { + NodeState nodeState = nr.getNodeState(); + if(nodeState.isUnusable()) { + // act on the updates + job.actOnUnusableNode(nr.getNodeId(), nodeState); + } + } + } + } + private static class InternalErrorTransition implements SingleArcTransition { @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index cafff92920e..66d48b69042 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -84,6 +84,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunched import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; @@ -403,6 +404,10 @@ public abstract class TaskAttemptImpl implements TaskAttemptState.FAILED, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE, new TooManyFetchFailureTransition()) + .addTransition( + TaskAttemptState.SUCCEEDED, TaskAttemptState.KILLED, + TaskAttemptEventType.TA_KILL, + new KilledAfterSuccessTransition()) .addTransition( TaskAttemptState.SUCCEEDED, TaskAttemptState.SUCCEEDED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, @@ -410,8 +415,7 @@ public abstract class TaskAttemptImpl implements // Ignore-able events for SUCCEEDED state .addTransition(TaskAttemptState.SUCCEEDED, TaskAttemptState.SUCCEEDED, - EnumSet.of(TaskAttemptEventType.TA_KILL, - TaskAttemptEventType.TA_FAILMSG, + EnumSet.of(TaskAttemptEventType.TA_FAILMSG, TaskAttemptEventType.TA_CONTAINER_CLEANED, TaskAttemptEventType.TA_CONTAINER_COMPLETED)) @@ -818,6 +822,16 @@ public abstract class TaskAttemptImpl implements } } + @Override + public NodeId getNodeId() { + readLock.lock(); + try { + return containerNodeId; + } finally { + readLock.unlock(); + } + } + /**If container Assigned then return the node's address, otherwise null. */ @Override @@ -999,7 +1013,7 @@ public abstract class TaskAttemptImpl implements } private static JobCounterUpdateEvent createJobCounterUpdateEventTAFailed( - TaskAttemptImpl taskAttempt) { + TaskAttemptImpl taskAttempt, boolean taskAlreadyCompleted) { TaskType taskType = taskAttempt.getID().getTaskId().getTaskType(); JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskAttempt.getID().getTaskId().getJobId()); @@ -1007,16 +1021,22 @@ public abstract class TaskAttemptImpl implements if (taskType == TaskType.MAP) { jce.addCounterUpdate(JobCounter.NUM_FAILED_MAPS, 1); - jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement); + if(!taskAlreadyCompleted) { + // dont double count the elapsed time + jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement); + } } else { jce.addCounterUpdate(JobCounter.NUM_FAILED_REDUCES, 1); - jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, slotMillisIncrement); + if(!taskAlreadyCompleted) { + // dont double count the elapsed time + jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, slotMillisIncrement); + } } return jce; } private static JobCounterUpdateEvent createJobCounterUpdateEventTAKilled( - TaskAttemptImpl taskAttempt) { + TaskAttemptImpl taskAttempt, boolean taskAlreadyCompleted) { TaskType taskType = taskAttempt.getID().getTaskId().getTaskType(); JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskAttempt.getID().getTaskId().getJobId()); @@ -1024,10 +1044,16 @@ public abstract class TaskAttemptImpl implements if (taskType == TaskType.MAP) { jce.addCounterUpdate(JobCounter.NUM_KILLED_MAPS, 1); - jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement); + if(!taskAlreadyCompleted) { + // dont double count the elapsed time + jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement); + } } else { jce.addCounterUpdate(JobCounter.NUM_KILLED_REDUCES, 1); - jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, slotMillisIncrement); + if(!taskAlreadyCompleted) { + // dont double count the elapsed time + jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, slotMillisIncrement); + } } return jce; } @@ -1259,10 +1285,10 @@ public abstract class TaskAttemptImpl implements finalState); if(finalState == TaskAttemptState.FAILED) { taskAttempt.eventHandler - .handle(createJobCounterUpdateEventTAFailed(taskAttempt)); + .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false)); } else if(finalState == TaskAttemptState.KILLED) { taskAttempt.eventHandler - .handle(createJobCounterUpdateEventTAKilled(taskAttempt)); + .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false)); } taskAttempt.eventHandler.handle(new JobHistoryEvent( taskAttempt.attemptId.getTaskId().getJobId(), tauce)); @@ -1394,7 +1420,7 @@ public abstract class TaskAttemptImpl implements if (taskAttempt.getLaunchTime() != 0) { taskAttempt.eventHandler - .handle(createJobCounterUpdateEventTAFailed(taskAttempt)); + .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false)); TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, TaskAttemptState.FAILED); @@ -1463,7 +1489,7 @@ public abstract class TaskAttemptImpl implements if (taskAttempt.getLaunchTime() != 0) { taskAttempt.eventHandler - .handle(createJobCounterUpdateEventTAFailed(taskAttempt)); + .handle(createJobCounterUpdateEventTAFailed(taskAttempt, true)); TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, TaskAttemptState.FAILED); @@ -1477,6 +1503,32 @@ public abstract class TaskAttemptImpl implements taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED)); } } + + private static class KilledAfterSuccessTransition implements + SingleArcTransition { + + @SuppressWarnings("unchecked") + @Override + public void transition(TaskAttemptImpl taskAttempt, + TaskAttemptEvent event) { + TaskAttemptKillEvent msgEvent = (TaskAttemptKillEvent) event; + //add to diagnostic + taskAttempt.addDiagnosticInfo(msgEvent.getMessage()); + + // not setting a finish time since it was set on success + assert (taskAttempt.getFinishTime() != 0); + + assert (taskAttempt.getLaunchTime() != 0); + taskAttempt.eventHandler + .handle(createJobCounterUpdateEventTAKilled(taskAttempt, true)); + TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent( + taskAttempt, TaskAttemptState.KILLED); + taskAttempt.eventHandler.handle(new JobHistoryEvent(taskAttempt.attemptId + .getTaskId().getJobId(), tauce)); + taskAttempt.eventHandler.handle(new TaskTAttemptEvent( + taskAttempt.attemptId, TaskEventType.T_ATTEMPT_KILLED)); + } + } private static class KilledTransition implements SingleArcTransition { @@ -1489,7 +1541,7 @@ public abstract class TaskAttemptImpl implements taskAttempt.setFinishTime(); if (taskAttempt.getLaunchTime() != 0) { taskAttempt.eventHandler - .handle(createJobCounterUpdateEventTAKilled(taskAttempt)); + .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false)); TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, TaskAttemptState.KILLED); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java index 58edd1690c4..c7f89f9ac58 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java @@ -191,13 +191,14 @@ public abstract class TaskImpl implements Task, EventHandler { .addTransition(TaskState.SUCCEEDED, //only possible for map tasks EnumSet.of(TaskState.SCHEDULED, TaskState.FAILED), TaskEventType.T_ATTEMPT_FAILED, new MapRetroactiveFailureTransition()) + .addTransition(TaskState.SUCCEEDED, //only possible for map tasks + EnumSet.of(TaskState.SCHEDULED, TaskState.SUCCEEDED), + TaskEventType.T_ATTEMPT_KILLED, new MapRetroactiveKilledTransition()) // Ignore-able transitions. .addTransition( TaskState.SUCCEEDED, TaskState.SUCCEEDED, - EnumSet.of(TaskEventType.T_KILL, - TaskEventType.T_ADD_SPEC_ATTEMPT, - TaskEventType.T_ATTEMPT_LAUNCHED, - TaskEventType.T_ATTEMPT_KILLED)) + EnumSet.of(TaskEventType.T_ADD_SPEC_ATTEMPT, + TaskEventType.T_ATTEMPT_LAUNCHED)) // Transitions from FAILED state .addTransition(TaskState.FAILED, TaskState.FAILED, @@ -629,7 +630,6 @@ public abstract class TaskImpl implements Task, EventHandler { // always called inside a transition, in turn inside the Write Lock private void handleTaskAttemptCompletion(TaskAttemptId attemptId, TaskAttemptCompletionEventStatus status) { - finishedAttempts++; TaskAttempt attempt = attempts.get(attemptId); //raise the completion event only if the container is assigned // to nextAttemptNumber @@ -681,6 +681,11 @@ public abstract class TaskImpl implements Task, EventHandler { taId == null ? null : TypeConverter.fromYarn(taId)); return taskFailedEvent; } + + private static void unSucceed(TaskImpl task) { + task.commitAttempt = null; + task.successfulAttempt = null; + } /** * @return a String representation of the splits. @@ -755,6 +760,7 @@ public abstract class TaskImpl implements Task, EventHandler { task.handleTaskAttemptCompletion( ((TaskTAttemptEvent) event).getTaskAttemptID(), TaskAttemptCompletionEventStatus.SUCCEEDED); + task.finishedAttempts++; --task.numberUncompletedAttempts; task.successfulAttempt = ((TaskTAttemptEvent) event).getTaskAttemptID(); task.eventHandler.handle(new JobTaskEvent( @@ -790,6 +796,7 @@ public abstract class TaskImpl implements Task, EventHandler { task.handleTaskAttemptCompletion( ((TaskTAttemptEvent) event).getTaskAttemptID(), TaskAttemptCompletionEventStatus.KILLED); + task.finishedAttempts++; --task.numberUncompletedAttempts; if (task.successfulAttempt == null) { task.addAndScheduleAttempt(); @@ -808,6 +815,7 @@ public abstract class TaskImpl implements Task, EventHandler { task.handleTaskAttemptCompletion( ((TaskTAttemptEvent) event).getTaskAttemptID(), TaskAttemptCompletionEventStatus.KILLED); + task.finishedAttempts++; // check whether all attempts are finished if (task.finishedAttempts == task.attempts.size()) { if (task.historyTaskStartGenerated) { @@ -845,6 +853,7 @@ public abstract class TaskImpl implements Task, EventHandler { attempt.getAssignedContainerMgrAddress())); } + task.finishedAttempts++; if (task.failedAttempts < task.maxAttempts) { task.handleTaskAttemptCompletion( ((TaskTAttemptEvent) event).getTaskAttemptID(), @@ -880,12 +889,6 @@ public abstract class TaskImpl implements Task, EventHandler { protected TaskState getDefaultState(Task task) { return task.getState(); } - - protected void unSucceed(TaskImpl task) { - ++task.numberUncompletedAttempts; - task.commitAttempt = null; - task.successfulAttempt = null; - } } private static class MapRetroactiveFailureTransition @@ -908,6 +911,8 @@ public abstract class TaskImpl implements Task, EventHandler { // fails, we have to let AttemptFailedTransition.transition // believe that there's no redundancy. unSucceed(task); + // fake increase in Uncomplete attempts for super.transition + ++task.numberUncompletedAttempts; return super.transition(task, event); } @@ -917,6 +922,45 @@ public abstract class TaskImpl implements Task, EventHandler { } } + private static class MapRetroactiveKilledTransition implements + MultipleArcTransition { + + @Override + public TaskState transition(TaskImpl task, TaskEvent event) { + // verify that this occurs only for map task + // TODO: consider moving it to MapTaskImpl + if (!TaskType.MAP.equals(task.getType())) { + LOG.error("Unexpected event for REDUCE task " + event.getType()); + task.internalError(event.getType()); + } + + TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event; + TaskAttemptId attemptId = attemptEvent.getTaskAttemptID(); + if(task.successfulAttempt == attemptId) { + // successful attempt is now killed. reschedule + // tell the job about the rescheduling + unSucceed(task); + task.handleTaskAttemptCompletion( + attemptId, + TaskAttemptCompletionEventStatus.KILLED); + task.eventHandler.handle(new JobMapTaskRescheduledEvent(task.taskId)); + // typically we are here because this map task was run on a bad node and + // we want to reschedule it on a different node. + // Depending on whether there are previous failed attempts or not this + // can SCHEDULE or RESCHEDULE the container allocate request. If this + // SCHEDULE's then the dataLocal hosts of this taskAttempt will be used + // from the map splitInfo. So the bad node might be sent as a location + // to the RM. But the RM would ignore that just like it would ignore + // currently pending container requests affinitized to bad nodes. + task.addAndScheduleAttempt(); + return TaskState.SCHEDULED; + } else { + // nothing to do + return TaskState.SUCCEEDED; + } + } + } + private static class KillNewTransition implements SingleArcTransition { @Override @@ -966,6 +1010,7 @@ public abstract class TaskImpl implements Task, EventHandler { public void transition(TaskImpl task, TaskEvent event) { task.metrics.launchedTask(task); task.metrics.runningTask(task); + } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index bcb82230d6a..efef456f109 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.app.rm; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -46,19 +47,27 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; +import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.job.Task; +import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.AMResponse; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.util.RackResolver; @@ -583,7 +592,9 @@ public class RMContainerAllocator extends RMContainerRequestor //Called on each allocation. Will know about newly blacklisted/added hosts. computeIgnoreBlacklisting(); - + + handleUpdatedNodes(response); + for (ContainerStatus cont : finishedContainers) { LOG.info("Received completed container " + cont.getContainerId()); TaskAttemptId attemptID = assignedRequests.get(cont.getContainerId()); @@ -600,10 +611,48 @@ public class RMContainerAllocator extends RMContainerRequestor String diagnostics = cont.getDiagnostics(); eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID, diagnostics)); - } + } } return newContainers; } + + @SuppressWarnings("unchecked") + private void handleUpdatedNodes(AMResponse response) { + // send event to the job about on updated nodes + List updatedNodes = response.getUpdatedNodes(); + if (!updatedNodes.isEmpty()) { + + // send event to the job to act upon completed tasks + eventHandler.handle(new JobUpdatedNodesEvent(getJob().getID(), + updatedNodes)); + + // act upon running tasks + HashSet unusableNodes = new HashSet(); + for (NodeReport nr : updatedNodes) { + NodeState nodeState = nr.getNodeState(); + if (nodeState.isUnusable()) { + unusableNodes.add(nr.getNodeId()); + } + } + for (int i = 0; i < 2; ++i) { + HashMap taskSet = i == 0 ? assignedRequests.maps + : assignedRequests.reduces; + // kill running containers + for (Map.Entry entry : taskSet.entrySet()) { + TaskAttemptId tid = entry.getKey(); + NodeId taskAttemptNodeId = entry.getValue().getNodeId(); + if (unusableNodes.contains(taskAttemptNodeId)) { + LOG.info("Killing taskAttempt:" + tid + + " because it is running on unusable node:" + + taskAttemptNodeId); + eventHandler.handle(new TaskAttemptKillEvent(tid, + "TaskAttempt killed because it ran on unusable node" + + taskAttemptNodeId)); + } + } + } + } + } @Private public int getMemLimit() { @@ -743,7 +792,6 @@ public class RMContainerAllocator extends RMContainerRequestor boolean blackListed = false; ContainerRequest assigned = null; - ContainerId allocatedContainerId = allocated.getId(); if (isAssignable) { // do not assign if allocated container is on a // blacklisted host @@ -790,7 +838,7 @@ public class RMContainerAllocator extends RMContainerRequestor eventHandler.handle(new TaskAttemptContainerAssignedEvent( assigned.attemptID, allocated, applicationACLs)); - assignedRequests.add(allocatedContainerId, assigned.attemptID); + assignedRequests.add(allocated, assigned.attemptID); if (LOG.isDebugEnabled()) { LOG.info("Assigned container (" + allocated + ") " @@ -811,7 +859,7 @@ public class RMContainerAllocator extends RMContainerRequestor // or if we could not assign it if (blackListed || assigned == null) { containersReleased++; - release(allocatedContainerId); + release(allocated.getId()); } } } @@ -974,20 +1022,20 @@ public class RMContainerAllocator extends RMContainerRequestor private class AssignedRequests { private final Map containerToAttemptMap = new HashMap(); - private final LinkedHashMap maps = - new LinkedHashMap(); - private final LinkedHashMap reduces = - new LinkedHashMap(); + private final LinkedHashMap maps = + new LinkedHashMap(); + private final LinkedHashMap reduces = + new LinkedHashMap(); private final Set preemptionWaitingReduces = new HashSet(); - void add(ContainerId containerId, TaskAttemptId tId) { - LOG.info("Assigned container " + containerId.toString() + " to " + tId); - containerToAttemptMap.put(containerId, tId); + void add(Container container, TaskAttemptId tId) { + LOG.info("Assigned container " + container.getId().toString() + " to " + tId); + containerToAttemptMap.put(container.getId(), tId); if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) { - maps.put(tId, containerId); + maps.put(tId, container); } else { - reduces.put(tId, containerId); + reduces.put(tId, container); } } @@ -1017,9 +1065,9 @@ public class RMContainerAllocator extends RMContainerRequestor boolean remove(TaskAttemptId tId) { ContainerId containerId = null; if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) { - containerId = maps.remove(tId); + containerId = maps.remove(tId).getId(); } else { - containerId = reduces.remove(tId); + containerId = reduces.remove(tId).getId(); if (containerId != null) { boolean preempted = preemptionWaitingReduces.remove(tId); if (preempted) { @@ -1038,12 +1086,20 @@ public class RMContainerAllocator extends RMContainerRequestor TaskAttemptId get(ContainerId cId) { return containerToAttemptMap.get(cId); } + + NodeId getNodeId(TaskAttemptId tId) { + if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) { + return maps.get(tId).getNodeId(); + } else { + return reduces.get(tId).getNodeId(); + } + } ContainerId get(TaskAttemptId tId) { if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) { - return maps.get(tId); + return maps.get(tId).getId(); } else { - return reduces.get(tId); + return reduces.get(tId).getId(); } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java index 81af358bc23..7f1d8200327 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java @@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.Records; @@ -231,6 +232,11 @@ public class MockJobs extends MockApps { final List diags = Lists.newArrayList(); diags.add(DIAGS.next()); return new TaskAttempt() { + @Override + public NodeId getNodeId() throws UnsupportedOperationException{ + throw new UnsupportedOperationException(); + } + @Override public TaskAttemptId getID() { return taid; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java index 3ca9c24bad4..a0d8e778241 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java @@ -22,6 +22,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import java.util.ArrayList; import java.util.Iterator; import junit.framework.Assert; @@ -29,17 +30,26 @@ import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.junit.Test; /** @@ -160,6 +170,159 @@ public class TestMRApp { app.waitForState(job, JobState.SUCCEEDED); } + + /** + * The test verifies that the AM re-runs maps that have run on bad nodes. It + * also verifies that the AM records all success/killed events so that reduces + * are notified about map output status changes. It also verifies that the + * re-run information is preserved across AM restart + */ + @Test + public void testUpdatedNodes() throws Exception { + int runCount = 0; + MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), + true, ++runCount); + Configuration conf = new Configuration(); + // after half of the map completion, reduce will start + conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.5f); + // uberization forces full slowstart (1.0), so disable that + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("Num tasks not correct", 3, job.getTasks().size()); + Iterator it = job.getTasks().values().iterator(); + Task mapTask1 = it.next(); + Task mapTask2 = it.next(); + + // all maps must be running + app.waitForState(mapTask1, TaskState.RUNNING); + app.waitForState(mapTask2, TaskState.RUNNING); + + TaskAttempt task1Attempt = mapTask1.getAttempts().values().iterator() + .next(); + TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator() + .next(); + NodeId node1 = task1Attempt.getNodeId(); + NodeId node2 = task2Attempt.getNodeId(); + Assert.assertEquals(node1, node2); + + // send the done signal to the task + app.getContext() + .getEventHandler() + .handle( + new TaskAttemptEvent(task1Attempt.getID(), + TaskAttemptEventType.TA_DONE)); + app.getContext() + .getEventHandler() + .handle( + new TaskAttemptEvent(task2Attempt.getID(), + TaskAttemptEventType.TA_DONE)); + + // all maps must be succeeded + app.waitForState(mapTask1, TaskState.SUCCEEDED); + app.waitForState(mapTask2, TaskState.SUCCEEDED); + + TaskAttemptCompletionEvent[] events = job.getTaskAttemptCompletionEvents(0, + 100); + Assert.assertEquals("Expecting 2 completion events for success", 2, + events.length); + + // send updated nodes info + ArrayList updatedNodes = new ArrayList(); + NodeReport nr = RecordFactoryProvider.getRecordFactory(null) + .newRecordInstance(NodeReport.class); + nr.setNodeId(node1); + nr.setNodeState(NodeState.UNHEALTHY); + updatedNodes.add(nr); + app.getContext().getEventHandler() + .handle(new JobUpdatedNodesEvent(job.getID(), updatedNodes)); + + app.waitForState(task1Attempt, TaskAttemptState.KILLED); + app.waitForState(task2Attempt, TaskAttemptState.KILLED); + + events = job.getTaskAttemptCompletionEvents(0, 100); + Assert.assertEquals("Expecting 2 more completion events for killed", 4, + events.length); + + // all maps must be back to running + app.waitForState(mapTask1, TaskState.RUNNING); + app.waitForState(mapTask2, TaskState.RUNNING); + + Iterator itr = mapTask1.getAttempts().values().iterator(); + itr.next(); + task1Attempt = itr.next(); + + // send the done signal to the task + app.getContext() + .getEventHandler() + .handle( + new TaskAttemptEvent(task1Attempt.getID(), + TaskAttemptEventType.TA_DONE)); + + // map1 must be succeeded. map2 must be running + app.waitForState(mapTask1, TaskState.SUCCEEDED); + app.waitForState(mapTask2, TaskState.RUNNING); + + events = job.getTaskAttemptCompletionEvents(0, 100); + Assert.assertEquals("Expecting 1 more completion events for success", 5, + events.length); + + // Crash the app again. + app.stop(); + + // rerun + // in rerun the 1st map will be recovered from previous run + app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, + ++runCount); + conf = new Configuration(); + conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size()); + it = job.getTasks().values().iterator(); + mapTask1 = it.next(); + mapTask2 = it.next(); + Task reduceTask = it.next(); + + // map 1 will be recovered, no need to send done + app.waitForState(mapTask1, TaskState.SUCCEEDED); + app.waitForState(mapTask2, TaskState.RUNNING); + + events = job.getTaskAttemptCompletionEvents(0, 100); + Assert.assertEquals( + "Expecting 2 completion events for killed & success of map1", 2, + events.length); + + task2Attempt = mapTask2.getAttempts().values().iterator().next(); + app.getContext() + .getEventHandler() + .handle( + new TaskAttemptEvent(task2Attempt.getID(), + TaskAttemptEventType.TA_DONE)); + app.waitForState(mapTask2, TaskState.SUCCEEDED); + + events = job.getTaskAttemptCompletionEvents(0, 100); + Assert.assertEquals("Expecting 1 more completion events for success", 3, + events.length); + + app.waitForState(reduceTask, TaskState.RUNNING); + TaskAttempt task3Attempt = reduceTask.getAttempts().values().iterator() + .next(); + app.getContext() + .getEventHandler() + .handle( + new TaskAttemptEvent(task3Attempt.getID(), + TaskAttemptEventType.TA_DONE)); + app.waitForState(reduceTask, TaskState.SUCCEEDED); + + events = job.getTaskAttemptCompletionEvents(0, 100); + Assert.assertEquals("Expecting 1 more completion events for success", 4, + events.length); + + // job succeeds + app.waitForState(job, JobState.SUCCEEDED); + } @Test public void testJobError() throws Exception { @@ -194,10 +357,6 @@ public class TestMRApp { ((AppContext) getContext()).getAllJobs().put(spiedJob.getID(), spiedJob); return spiedJob; } - - JobImpl getSpiedJob() { - return this.spiedJob; - } } @Test @@ -232,6 +391,21 @@ public class TestMRApp { TypeConverter.fromYarn(state); } } + + private final class MRAppWithHistory extends MRApp { + public MRAppWithHistory(int maps, int reduces, boolean autoComplete, + String testName, boolean cleanOnStart, int startCount) { + super(maps, reduces, autoComplete, testName, cleanOnStart, startCount); + } + + @Override + protected EventHandler createJobHistoryHandler( + AppContext context) { + JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(context, + getStartCount()); + return eventHandler; + } + } public static void main(String[] args) throws Exception { TestMRApp t = new TestMRApp(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java index a7f42eed34f..98bc020ecb4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java @@ -46,9 +46,11 @@ import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent; @@ -594,6 +596,88 @@ public class TestRMContainerAllocator { Assert.assertEquals(0.95f, job.getProgress(), 0.001f); Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f); } + + @Test + public void testUpdatedNodes() throws Exception { + Configuration conf = new Configuration(); + MyResourceManager rm = new MyResourceManager(conf); + rm.start(); + DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() + .getDispatcher(); + + // Submit the application + RMApp app = rm.submitApp(1024); + dispatcher.await(); + MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); + amNodeManager.nodeHeartbeat(true); + dispatcher.await(); + + ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + rm.sendAMLaunched(appAttemptId); + dispatcher.await(); + + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + Job mockJob = mock(Job.class); + MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, + appAttemptId, mockJob); + + // add resources to scheduler + MockNM nm1 = rm.registerNode("h1:1234", 10240); + MockNM nm2 = rm.registerNode("h2:1234", 10240); + dispatcher.await(); + + // create the map container request + ContainerRequestEvent event = createReq(jobId, 1, 1024, + new String[] { "h1" }); + allocator.sendRequest(event); + TaskAttemptId attemptId = event.getAttemptID(); + + TaskAttempt mockTaskAttempt = mock(TaskAttempt.class); + when(mockTaskAttempt.getNodeId()).thenReturn(nm1.getNodeId()); + Task mockTask = mock(Task.class); + when(mockTask.getAttempt(attemptId)).thenReturn(mockTaskAttempt); + when(mockJob.getTask(attemptId.getTaskId())).thenReturn(mockTask); + + // this tells the scheduler about the requests + List assigned = allocator.schedule(); + dispatcher.await(); + + nm1.nodeHeartbeat(true); + dispatcher.await(); + // get the assignment + assigned = allocator.schedule(); + dispatcher.await(); + Assert.assertEquals(1, assigned.size()); + Assert.assertEquals(nm1.getNodeId(), assigned.get(0).getContainer().getNodeId()); + // no updated nodes reported + Assert.assertTrue(allocator.getJobUpdatedNodeEvents().isEmpty()); + Assert.assertTrue(allocator.getTaskAttemptKillEvents().isEmpty()); + + // mark nodes bad + nm1.nodeHeartbeat(false); + nm2.nodeHeartbeat(false); + dispatcher.await(); + + // schedule response returns updated nodes + assigned = allocator.schedule(); + dispatcher.await(); + Assert.assertEquals(0, assigned.size()); + // updated nodes are reported + Assert.assertEquals(1, allocator.getJobUpdatedNodeEvents().size()); + Assert.assertEquals(1, allocator.getTaskAttemptKillEvents().size()); + Assert.assertEquals(2, allocator.getJobUpdatedNodeEvents().get(0).getUpdatedNodes().size()); + Assert.assertEquals(attemptId, allocator.getTaskAttemptKillEvents().get(0).getTaskAttemptID()); + allocator.getJobUpdatedNodeEvents().clear(); + allocator.getTaskAttemptKillEvents().clear(); + + assigned = allocator.schedule(); + dispatcher.await(); + Assert.assertEquals(0, assigned.size()); + // no updated nodes reported + Assert.assertTrue(allocator.getJobUpdatedNodeEvents().isEmpty()); + Assert.assertTrue(allocator.getTaskAttemptKillEvents().isEmpty()); + } @Test public void testBlackListedNodes() throws Exception { @@ -1100,7 +1184,10 @@ public class TestRMContainerAllocator { private static class MyContainerAllocator extends RMContainerAllocator { static final List events = new ArrayList(); - + static final List taskAttemptKillEvents + = new ArrayList(); + static final List jobUpdatedNodeEvents + = new ArrayList(); private MyResourceManager rm; private static AppContext createAppContext( @@ -1119,6 +1206,10 @@ public class TestRMContainerAllocator { // Only capture interesting events. if (event instanceof TaskAttemptContainerAssignedEvent) { events.add((TaskAttemptContainerAssignedEvent) event); + } else if (event instanceof TaskAttemptKillEvent) { + taskAttemptKillEvents.add((TaskAttemptKillEvent)event); + } else if (event instanceof JobUpdatedNodesEvent) { + jobUpdatedNodeEvents.add((JobUpdatedNodesEvent)event); } } }); @@ -1202,6 +1293,14 @@ public class TestRMContainerAllocator { events.clear(); return result; } + + List getTaskAttemptKillEvents() { + return taskAttemptKillEvents; + } + + List getJobUpdatedNodeEvents() { + return jobUpdatedNodeEvents; + } @Override protected void startAllocatorThread() { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java index a5dae84062e..80c48233c2f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java @@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.SystemClock; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -528,6 +529,11 @@ public class TestRuntimeEstimators { dispatcher.getEventHandler().handle(event); } + @Override + public NodeId getNodeId() throws UnsupportedOperationException{ + throw new UnsupportedOperationException(); + } + @Override public TaskAttemptId getID() { return myAttemptID; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java index 34eb59449cd..32240f888f4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java @@ -282,9 +282,12 @@ public class JobHistoryParser { if(attemptInfo.getAttemptId().equals(taskInfo.getSuccessfulAttemptId())) { // the failed attempt is the one that made this task successful - // so its no longer successful + // so its no longer successful. Reset fields set in + // handleTaskFinishedEvent() + taskInfo.counters = null; + taskInfo.finishTime = -1; taskInfo.status = null; - // not resetting the other fields set in handleTaskFinishedEvent() + taskInfo.successfulAttemptId = null; } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java index 84ec23e63bf..8f8e77615cd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java @@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.util.Records; public class CompletedTaskAttempt implements TaskAttempt { @@ -57,6 +58,11 @@ public class CompletedTaskAttempt implements TaskAttempt { } } + @Override + public NodeId getNodeId() throws UnsupportedOperationException{ + throw new UnsupportedOperationException(); + } + @Override public ContainerId getAssignedContainerID() { return attemptInfo.getContainerId(); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeState.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeState.java index 31916c6be8a..7345c258c7c 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeState.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeState.java @@ -38,5 +38,9 @@ public enum NodeState { LOST, /** Node has rebooted */ - REBOOTED -} \ No newline at end of file + REBOOTED; + + public boolean isUnusable() { + return (this == UNHEALTHY || this == DECOMMISSIONED || this == LOST); + } +}