Merged MAPREDUCE-3921 from trunk. MR AM should act on node health status changes. Contributed by Bikas Saha
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1349066 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9d83af831b
commit
2846979f16
|
@ -14,6 +14,9 @@ Release 2.0.1-alpha - UNRELEASED
|
||||||
MAPREDUCE-3871. Allow symlinking in LocalJobRunner DistributedCache.
|
MAPREDUCE-3871. Allow symlinking in LocalJobRunner DistributedCache.
|
||||||
(tomwhite)
|
(tomwhite)
|
||||||
|
|
||||||
|
MAPREDUCE-3921. MR AM should act on node health status changes.
|
||||||
|
(Bikas Saha via sseth)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -54,6 +55,11 @@ public interface TaskAttempt {
|
||||||
*/
|
*/
|
||||||
String getAssignedContainerMgrAddress();
|
String getAssignedContainerMgrAddress();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return node's id if a container is assigned, otherwise null.
|
||||||
|
*/
|
||||||
|
NodeId getNodeId();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return node's http address if a container is assigned, otherwise null.
|
* @return node's http address if a container is assigned, otherwise null.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -44,5 +44,9 @@ public enum JobEventType {
|
||||||
JOB_COUNTER_UPDATE,
|
JOB_COUNTER_UPDATE,
|
||||||
|
|
||||||
//Producer:TaskAttemptListener
|
//Producer:TaskAttemptListener
|
||||||
JOB_TASK_ATTEMPT_FETCH_FAILURE
|
JOB_TASK_ATTEMPT_FETCH_FAILURE,
|
||||||
|
|
||||||
|
//Producer:RMContainerAllocator
|
||||||
|
JOB_UPDATED_NODES
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,40 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapreduce.v2.app.job.event;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
public class JobUpdatedNodesEvent extends JobEvent {
|
||||||
|
|
||||||
|
private final List<NodeReport> updatedNodes;
|
||||||
|
public JobUpdatedNodesEvent(JobId jobId, List<NodeReport> updatedNodes) {
|
||||||
|
super(jobId, JobEventType.JOB_UPDATED_NODES);
|
||||||
|
this.updatedNodes = updatedNodes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<NodeReport> getUpdatedNodes() {
|
||||||
|
return updatedNodes;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,37 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapreduce.v2.app.job.event;
|
||||||
|
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||||
|
|
||||||
|
|
||||||
|
public class TaskAttemptKillEvent extends TaskAttemptEvent {
|
||||||
|
|
||||||
|
private final String message;
|
||||||
|
|
||||||
|
public TaskAttemptKillEvent(TaskAttemptId attemptID,
|
||||||
|
String message) {
|
||||||
|
super(attemptID, TaskAttemptEventType.TA_KILL);
|
||||||
|
this.message = message;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getMessage() {
|
||||||
|
return message;
|
||||||
|
}
|
||||||
|
}
|
|
@ -77,6 +77,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
|
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
|
||||||
|
@ -85,8 +86,10 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
|
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
|
||||||
|
@ -100,6 +103,9 @@ import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.Clock;
|
import org.apache.hadoop.yarn.Clock;
|
||||||
import org.apache.hadoop.yarn.YarnException;
|
import org.apache.hadoop.yarn.YarnException;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
|
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
|
||||||
import org.apache.hadoop.yarn.state.MultipleArcTransition;
|
import org.apache.hadoop.yarn.state.MultipleArcTransition;
|
||||||
|
@ -148,6 +154,12 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
private final Object tasksSyncHandle = new Object();
|
private final Object tasksSyncHandle = new Object();
|
||||||
private final Set<TaskId> mapTasks = new LinkedHashSet<TaskId>();
|
private final Set<TaskId> mapTasks = new LinkedHashSet<TaskId>();
|
||||||
private final Set<TaskId> reduceTasks = new LinkedHashSet<TaskId>();
|
private final Set<TaskId> reduceTasks = new LinkedHashSet<TaskId>();
|
||||||
|
/**
|
||||||
|
* maps nodes to tasks that have run on those nodes
|
||||||
|
*/
|
||||||
|
private final HashMap<NodeId, List<TaskAttemptId>>
|
||||||
|
nodesToSucceededTaskAttempts = new HashMap<NodeId, List<TaskAttemptId>>();
|
||||||
|
|
||||||
private final EventHandler eventHandler;
|
private final EventHandler eventHandler;
|
||||||
private final MRAppMetrics metrics;
|
private final MRAppMetrics metrics;
|
||||||
private final String userName;
|
private final String userName;
|
||||||
|
@ -194,6 +206,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
new TaskAttemptCompletedEventTransition();
|
new TaskAttemptCompletedEventTransition();
|
||||||
private static final CounterUpdateTransition COUNTER_UPDATE_TRANSITION =
|
private static final CounterUpdateTransition COUNTER_UPDATE_TRANSITION =
|
||||||
new CounterUpdateTransition();
|
new CounterUpdateTransition();
|
||||||
|
private static final UpdatedNodesTransition UPDATED_NODES_TRANSITION =
|
||||||
|
new UpdatedNodesTransition();
|
||||||
|
|
||||||
protected static final
|
protected static final
|
||||||
StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent>
|
StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent>
|
||||||
|
@ -218,6 +232,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
.addTransition(JobState.NEW, JobState.ERROR,
|
.addTransition(JobState.NEW, JobState.ERROR,
|
||||||
JobEventType.INTERNAL_ERROR,
|
JobEventType.INTERNAL_ERROR,
|
||||||
INTERNAL_ERROR_TRANSITION)
|
INTERNAL_ERROR_TRANSITION)
|
||||||
|
// Ignore-able events
|
||||||
|
.addTransition(JobState.NEW, JobState.NEW,
|
||||||
|
JobEventType.JOB_UPDATED_NODES)
|
||||||
|
|
||||||
// Transitions from INITED state
|
// Transitions from INITED state
|
||||||
.addTransition(JobState.INITED, JobState.INITED,
|
.addTransition(JobState.INITED, JobState.INITED,
|
||||||
|
@ -234,6 +251,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
.addTransition(JobState.INITED, JobState.ERROR,
|
.addTransition(JobState.INITED, JobState.ERROR,
|
||||||
JobEventType.INTERNAL_ERROR,
|
JobEventType.INTERNAL_ERROR,
|
||||||
INTERNAL_ERROR_TRANSITION)
|
INTERNAL_ERROR_TRANSITION)
|
||||||
|
// Ignore-able events
|
||||||
|
.addTransition(JobState.INITED, JobState.INITED,
|
||||||
|
JobEventType.JOB_UPDATED_NODES)
|
||||||
|
|
||||||
// Transitions from RUNNING state
|
// Transitions from RUNNING state
|
||||||
.addTransition(JobState.RUNNING, JobState.RUNNING,
|
.addTransition(JobState.RUNNING, JobState.RUNNING,
|
||||||
|
@ -251,6 +271,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
new JobNoTasksCompletedTransition())
|
new JobNoTasksCompletedTransition())
|
||||||
.addTransition(JobState.RUNNING, JobState.KILL_WAIT,
|
.addTransition(JobState.RUNNING, JobState.KILL_WAIT,
|
||||||
JobEventType.JOB_KILL, new KillTasksTransition())
|
JobEventType.JOB_KILL, new KillTasksTransition())
|
||||||
|
.addTransition(JobState.RUNNING, JobState.RUNNING,
|
||||||
|
JobEventType.JOB_UPDATED_NODES,
|
||||||
|
UPDATED_NODES_TRANSITION)
|
||||||
.addTransition(JobState.RUNNING, JobState.RUNNING,
|
.addTransition(JobState.RUNNING, JobState.RUNNING,
|
||||||
JobEventType.JOB_MAP_TASK_RESCHEDULED,
|
JobEventType.JOB_MAP_TASK_RESCHEDULED,
|
||||||
new MapTaskRescheduledTransition())
|
new MapTaskRescheduledTransition())
|
||||||
|
@ -288,6 +311,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
// Ignore-able events
|
// Ignore-able events
|
||||||
.addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
|
.addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
|
||||||
EnumSet.of(JobEventType.JOB_KILL,
|
EnumSet.of(JobEventType.JOB_KILL,
|
||||||
|
JobEventType.JOB_UPDATED_NODES,
|
||||||
JobEventType.JOB_MAP_TASK_RESCHEDULED,
|
JobEventType.JOB_MAP_TASK_RESCHEDULED,
|
||||||
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
|
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
|
||||||
|
|
||||||
|
@ -304,6 +328,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
// Ignore-able events
|
// Ignore-able events
|
||||||
.addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED,
|
.addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED,
|
||||||
EnumSet.of(JobEventType.JOB_KILL,
|
EnumSet.of(JobEventType.JOB_KILL,
|
||||||
|
JobEventType.JOB_UPDATED_NODES,
|
||||||
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
|
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
|
||||||
|
|
||||||
// Transitions from FAILED state
|
// Transitions from FAILED state
|
||||||
|
@ -319,6 +344,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
// Ignore-able events
|
// Ignore-able events
|
||||||
.addTransition(JobState.FAILED, JobState.FAILED,
|
.addTransition(JobState.FAILED, JobState.FAILED,
|
||||||
EnumSet.of(JobEventType.JOB_KILL,
|
EnumSet.of(JobEventType.JOB_KILL,
|
||||||
|
JobEventType.JOB_UPDATED_NODES,
|
||||||
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
|
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
|
||||||
|
|
||||||
// Transitions from KILLED state
|
// Transitions from KILLED state
|
||||||
|
@ -334,6 +360,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
// Ignore-able events
|
// Ignore-able events
|
||||||
.addTransition(JobState.KILLED, JobState.KILLED,
|
.addTransition(JobState.KILLED, JobState.KILLED,
|
||||||
EnumSet.of(JobEventType.JOB_KILL,
|
EnumSet.of(JobEventType.JOB_KILL,
|
||||||
|
JobEventType.JOB_UPDATED_NODES,
|
||||||
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
|
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
|
||||||
|
|
||||||
// No transitions from INTERNAL_ERROR state. Ignore all.
|
// No transitions from INTERNAL_ERROR state. Ignore all.
|
||||||
|
@ -346,6 +373,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
|
JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
|
||||||
JobEventType.JOB_MAP_TASK_RESCHEDULED,
|
JobEventType.JOB_MAP_TASK_RESCHEDULED,
|
||||||
JobEventType.JOB_DIAGNOSTIC_UPDATE,
|
JobEventType.JOB_DIAGNOSTIC_UPDATE,
|
||||||
|
JobEventType.JOB_UPDATED_NODES,
|
||||||
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
|
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
|
||||||
JobEventType.INTERNAL_ERROR))
|
JobEventType.INTERNAL_ERROR))
|
||||||
.addTransition(JobState.ERROR, JobState.ERROR,
|
.addTransition(JobState.ERROR, JobState.ERROR,
|
||||||
|
@ -925,6 +953,24 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
return isChainJob;
|
return isChainJob;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void actOnUnusableNode(NodeId nodeId, NodeState nodeState) {
|
||||||
|
// rerun previously successful map tasks
|
||||||
|
List<TaskAttemptId> taskAttemptIdList = nodesToSucceededTaskAttempts.get(nodeId);
|
||||||
|
if(taskAttemptIdList != null) {
|
||||||
|
String mesg = "TaskAttempt killed because it ran on unusable node "
|
||||||
|
+ nodeId;
|
||||||
|
for(TaskAttemptId id : taskAttemptIdList) {
|
||||||
|
if(TaskType.MAP == id.getTaskId().getTaskType()) {
|
||||||
|
// reschedule only map tasks because their outputs maybe unusable
|
||||||
|
LOG.info(mesg + ". AttemptId:" + id);
|
||||||
|
eventHandler.handle(new TaskAttemptKillEvent(id, mesg));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// currently running task attempts on unusable nodes are handled in
|
||||||
|
// RMContainerAllocator
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
private int getBlockSize() {
|
private int getBlockSize() {
|
||||||
String inputClassName = conf.get(MRJobConfig.INPUT_FORMAT_CLASS_ATTR);
|
String inputClassName = conf.get(MRJobConfig.INPUT_FORMAT_CLASS_ATTR);
|
||||||
|
@ -1269,18 +1315,37 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
tce.setEventId(job.taskAttemptCompletionEvents.size());
|
tce.setEventId(job.taskAttemptCompletionEvents.size());
|
||||||
job.taskAttemptCompletionEvents.add(tce);
|
job.taskAttemptCompletionEvents.add(tce);
|
||||||
|
|
||||||
|
TaskAttemptId attemptId = tce.getAttemptId();
|
||||||
|
TaskId taskId = attemptId.getTaskId();
|
||||||
//make the previous completion event as obsolete if it exists
|
//make the previous completion event as obsolete if it exists
|
||||||
Object successEventNo =
|
Object successEventNo =
|
||||||
job.successAttemptCompletionEventNoMap.remove(tce.getAttemptId().getTaskId());
|
job.successAttemptCompletionEventNoMap.remove(taskId);
|
||||||
if (successEventNo != null) {
|
if (successEventNo != null) {
|
||||||
TaskAttemptCompletionEvent successEvent =
|
TaskAttemptCompletionEvent successEvent =
|
||||||
job.taskAttemptCompletionEvents.get((Integer) successEventNo);
|
job.taskAttemptCompletionEvents.get((Integer) successEventNo);
|
||||||
successEvent.setStatus(TaskAttemptCompletionEventStatus.OBSOLETE);
|
successEvent.setStatus(TaskAttemptCompletionEventStatus.OBSOLETE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if this attempt is not successful then why is the previous successful
|
||||||
|
// attempt being removed above - MAPREDUCE-4330
|
||||||
if (TaskAttemptCompletionEventStatus.SUCCEEDED.equals(tce.getStatus())) {
|
if (TaskAttemptCompletionEventStatus.SUCCEEDED.equals(tce.getStatus())) {
|
||||||
job.successAttemptCompletionEventNoMap.put(tce.getAttemptId().getTaskId(),
|
job.successAttemptCompletionEventNoMap.put(taskId, tce.getEventId());
|
||||||
tce.getEventId());
|
|
||||||
|
// here we could have simply called Task.getSuccessfulAttempt() but
|
||||||
|
// the event that triggers this code is sent before
|
||||||
|
// Task.successfulAttempt is set and so there is no guarantee that it
|
||||||
|
// will be available now
|
||||||
|
Task task = job.tasks.get(taskId);
|
||||||
|
TaskAttempt attempt = task.getAttempt(attemptId);
|
||||||
|
NodeId nodeId = attempt.getNodeId();
|
||||||
|
assert (nodeId != null); // node must exist for a successful event
|
||||||
|
List<TaskAttemptId> taskAttemptIdList = job.nodesToSucceededTaskAttempts
|
||||||
|
.get(nodeId);
|
||||||
|
if (taskAttemptIdList == null) {
|
||||||
|
taskAttemptIdList = new ArrayList<TaskAttemptId>();
|
||||||
|
job.nodesToSucceededTaskAttempts.put(nodeId, taskAttemptIdList);
|
||||||
|
}
|
||||||
|
taskAttemptIdList.add(attempt.getID());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1461,6 +1526,21 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class UpdatedNodesTransition implements
|
||||||
|
SingleArcTransition<JobImpl, JobEvent> {
|
||||||
|
@Override
|
||||||
|
public void transition(JobImpl job, JobEvent event) {
|
||||||
|
JobUpdatedNodesEvent updateEvent = (JobUpdatedNodesEvent) event;
|
||||||
|
for(NodeReport nr: updateEvent.getUpdatedNodes()) {
|
||||||
|
NodeState nodeState = nr.getNodeState();
|
||||||
|
if(nodeState.isUnusable()) {
|
||||||
|
// act on the updates
|
||||||
|
job.actOnUnusableNode(nr.getNodeId(), nodeState);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static class InternalErrorTransition implements
|
private static class InternalErrorTransition implements
|
||||||
SingleArcTransition<JobImpl, JobEvent> {
|
SingleArcTransition<JobImpl, JobEvent> {
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -84,6 +84,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunched
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
|
||||||
|
@ -403,6 +404,10 @@ public abstract class TaskAttemptImpl implements
|
||||||
TaskAttemptState.FAILED,
|
TaskAttemptState.FAILED,
|
||||||
TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE,
|
TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE,
|
||||||
new TooManyFetchFailureTransition())
|
new TooManyFetchFailureTransition())
|
||||||
|
.addTransition(
|
||||||
|
TaskAttemptState.SUCCEEDED, TaskAttemptState.KILLED,
|
||||||
|
TaskAttemptEventType.TA_KILL,
|
||||||
|
new KilledAfterSuccessTransition())
|
||||||
.addTransition(
|
.addTransition(
|
||||||
TaskAttemptState.SUCCEEDED, TaskAttemptState.SUCCEEDED,
|
TaskAttemptState.SUCCEEDED, TaskAttemptState.SUCCEEDED,
|
||||||
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
|
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
|
||||||
|
@ -410,8 +415,7 @@ public abstract class TaskAttemptImpl implements
|
||||||
// Ignore-able events for SUCCEEDED state
|
// Ignore-able events for SUCCEEDED state
|
||||||
.addTransition(TaskAttemptState.SUCCEEDED,
|
.addTransition(TaskAttemptState.SUCCEEDED,
|
||||||
TaskAttemptState.SUCCEEDED,
|
TaskAttemptState.SUCCEEDED,
|
||||||
EnumSet.of(TaskAttemptEventType.TA_KILL,
|
EnumSet.of(TaskAttemptEventType.TA_FAILMSG,
|
||||||
TaskAttemptEventType.TA_FAILMSG,
|
|
||||||
TaskAttemptEventType.TA_CONTAINER_CLEANED,
|
TaskAttemptEventType.TA_CONTAINER_CLEANED,
|
||||||
TaskAttemptEventType.TA_CONTAINER_COMPLETED))
|
TaskAttemptEventType.TA_CONTAINER_COMPLETED))
|
||||||
|
|
||||||
|
@ -818,6 +822,16 @@ public abstract class TaskAttemptImpl implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public NodeId getNodeId() {
|
||||||
|
readLock.lock();
|
||||||
|
try {
|
||||||
|
return containerNodeId;
|
||||||
|
} finally {
|
||||||
|
readLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**If container Assigned then return the node's address, otherwise null.
|
/**If container Assigned then return the node's address, otherwise null.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
|
@ -999,7 +1013,7 @@ public abstract class TaskAttemptImpl implements
|
||||||
}
|
}
|
||||||
|
|
||||||
private static JobCounterUpdateEvent createJobCounterUpdateEventTAFailed(
|
private static JobCounterUpdateEvent createJobCounterUpdateEventTAFailed(
|
||||||
TaskAttemptImpl taskAttempt) {
|
TaskAttemptImpl taskAttempt, boolean taskAlreadyCompleted) {
|
||||||
TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
|
TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
|
||||||
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskAttempt.getID().getTaskId().getJobId());
|
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskAttempt.getID().getTaskId().getJobId());
|
||||||
|
|
||||||
|
@ -1007,16 +1021,22 @@ public abstract class TaskAttemptImpl implements
|
||||||
|
|
||||||
if (taskType == TaskType.MAP) {
|
if (taskType == TaskType.MAP) {
|
||||||
jce.addCounterUpdate(JobCounter.NUM_FAILED_MAPS, 1);
|
jce.addCounterUpdate(JobCounter.NUM_FAILED_MAPS, 1);
|
||||||
|
if(!taskAlreadyCompleted) {
|
||||||
|
// dont double count the elapsed time
|
||||||
jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement);
|
jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
jce.addCounterUpdate(JobCounter.NUM_FAILED_REDUCES, 1);
|
jce.addCounterUpdate(JobCounter.NUM_FAILED_REDUCES, 1);
|
||||||
|
if(!taskAlreadyCompleted) {
|
||||||
|
// dont double count the elapsed time
|
||||||
jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, slotMillisIncrement);
|
jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, slotMillisIncrement);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return jce;
|
return jce;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static JobCounterUpdateEvent createJobCounterUpdateEventTAKilled(
|
private static JobCounterUpdateEvent createJobCounterUpdateEventTAKilled(
|
||||||
TaskAttemptImpl taskAttempt) {
|
TaskAttemptImpl taskAttempt, boolean taskAlreadyCompleted) {
|
||||||
TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
|
TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
|
||||||
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskAttempt.getID().getTaskId().getJobId());
|
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskAttempt.getID().getTaskId().getJobId());
|
||||||
|
|
||||||
|
@ -1024,11 +1044,17 @@ public abstract class TaskAttemptImpl implements
|
||||||
|
|
||||||
if (taskType == TaskType.MAP) {
|
if (taskType == TaskType.MAP) {
|
||||||
jce.addCounterUpdate(JobCounter.NUM_KILLED_MAPS, 1);
|
jce.addCounterUpdate(JobCounter.NUM_KILLED_MAPS, 1);
|
||||||
|
if(!taskAlreadyCompleted) {
|
||||||
|
// dont double count the elapsed time
|
||||||
jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement);
|
jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
jce.addCounterUpdate(JobCounter.NUM_KILLED_REDUCES, 1);
|
jce.addCounterUpdate(JobCounter.NUM_KILLED_REDUCES, 1);
|
||||||
|
if(!taskAlreadyCompleted) {
|
||||||
|
// dont double count the elapsed time
|
||||||
jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, slotMillisIncrement);
|
jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, slotMillisIncrement);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return jce;
|
return jce;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1259,10 +1285,10 @@ public abstract class TaskAttemptImpl implements
|
||||||
finalState);
|
finalState);
|
||||||
if(finalState == TaskAttemptState.FAILED) {
|
if(finalState == TaskAttemptState.FAILED) {
|
||||||
taskAttempt.eventHandler
|
taskAttempt.eventHandler
|
||||||
.handle(createJobCounterUpdateEventTAFailed(taskAttempt));
|
.handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
|
||||||
} else if(finalState == TaskAttemptState.KILLED) {
|
} else if(finalState == TaskAttemptState.KILLED) {
|
||||||
taskAttempt.eventHandler
|
taskAttempt.eventHandler
|
||||||
.handle(createJobCounterUpdateEventTAKilled(taskAttempt));
|
.handle(createJobCounterUpdateEventTAKilled(taskAttempt, false));
|
||||||
}
|
}
|
||||||
taskAttempt.eventHandler.handle(new JobHistoryEvent(
|
taskAttempt.eventHandler.handle(new JobHistoryEvent(
|
||||||
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
|
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
|
||||||
|
@ -1394,7 +1420,7 @@ public abstract class TaskAttemptImpl implements
|
||||||
|
|
||||||
if (taskAttempt.getLaunchTime() != 0) {
|
if (taskAttempt.getLaunchTime() != 0) {
|
||||||
taskAttempt.eventHandler
|
taskAttempt.eventHandler
|
||||||
.handle(createJobCounterUpdateEventTAFailed(taskAttempt));
|
.handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
|
||||||
TaskAttemptUnsuccessfulCompletionEvent tauce =
|
TaskAttemptUnsuccessfulCompletionEvent tauce =
|
||||||
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
|
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
|
||||||
TaskAttemptState.FAILED);
|
TaskAttemptState.FAILED);
|
||||||
|
@ -1463,7 +1489,7 @@ public abstract class TaskAttemptImpl implements
|
||||||
|
|
||||||
if (taskAttempt.getLaunchTime() != 0) {
|
if (taskAttempt.getLaunchTime() != 0) {
|
||||||
taskAttempt.eventHandler
|
taskAttempt.eventHandler
|
||||||
.handle(createJobCounterUpdateEventTAFailed(taskAttempt));
|
.handle(createJobCounterUpdateEventTAFailed(taskAttempt, true));
|
||||||
TaskAttemptUnsuccessfulCompletionEvent tauce =
|
TaskAttemptUnsuccessfulCompletionEvent tauce =
|
||||||
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
|
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
|
||||||
TaskAttemptState.FAILED);
|
TaskAttemptState.FAILED);
|
||||||
|
@ -1478,6 +1504,32 @@ public abstract class TaskAttemptImpl implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class KilledAfterSuccessTransition implements
|
||||||
|
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
@Override
|
||||||
|
public void transition(TaskAttemptImpl taskAttempt,
|
||||||
|
TaskAttemptEvent event) {
|
||||||
|
TaskAttemptKillEvent msgEvent = (TaskAttemptKillEvent) event;
|
||||||
|
//add to diagnostic
|
||||||
|
taskAttempt.addDiagnosticInfo(msgEvent.getMessage());
|
||||||
|
|
||||||
|
// not setting a finish time since it was set on success
|
||||||
|
assert (taskAttempt.getFinishTime() != 0);
|
||||||
|
|
||||||
|
assert (taskAttempt.getLaunchTime() != 0);
|
||||||
|
taskAttempt.eventHandler
|
||||||
|
.handle(createJobCounterUpdateEventTAKilled(taskAttempt, true));
|
||||||
|
TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent(
|
||||||
|
taskAttempt, TaskAttemptState.KILLED);
|
||||||
|
taskAttempt.eventHandler.handle(new JobHistoryEvent(taskAttempt.attemptId
|
||||||
|
.getTaskId().getJobId(), tauce));
|
||||||
|
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
|
||||||
|
taskAttempt.attemptId, TaskEventType.T_ATTEMPT_KILLED));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static class KilledTransition implements
|
private static class KilledTransition implements
|
||||||
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
||||||
|
|
||||||
|
@ -1489,7 +1541,7 @@ public abstract class TaskAttemptImpl implements
|
||||||
taskAttempt.setFinishTime();
|
taskAttempt.setFinishTime();
|
||||||
if (taskAttempt.getLaunchTime() != 0) {
|
if (taskAttempt.getLaunchTime() != 0) {
|
||||||
taskAttempt.eventHandler
|
taskAttempt.eventHandler
|
||||||
.handle(createJobCounterUpdateEventTAKilled(taskAttempt));
|
.handle(createJobCounterUpdateEventTAKilled(taskAttempt, false));
|
||||||
TaskAttemptUnsuccessfulCompletionEvent tauce =
|
TaskAttemptUnsuccessfulCompletionEvent tauce =
|
||||||
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
|
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
|
||||||
TaskAttemptState.KILLED);
|
TaskAttemptState.KILLED);
|
||||||
|
|
|
@ -191,13 +191,14 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
||||||
.addTransition(TaskState.SUCCEEDED, //only possible for map tasks
|
.addTransition(TaskState.SUCCEEDED, //only possible for map tasks
|
||||||
EnumSet.of(TaskState.SCHEDULED, TaskState.FAILED),
|
EnumSet.of(TaskState.SCHEDULED, TaskState.FAILED),
|
||||||
TaskEventType.T_ATTEMPT_FAILED, new MapRetroactiveFailureTransition())
|
TaskEventType.T_ATTEMPT_FAILED, new MapRetroactiveFailureTransition())
|
||||||
|
.addTransition(TaskState.SUCCEEDED, //only possible for map tasks
|
||||||
|
EnumSet.of(TaskState.SCHEDULED, TaskState.SUCCEEDED),
|
||||||
|
TaskEventType.T_ATTEMPT_KILLED, new MapRetroactiveKilledTransition())
|
||||||
// Ignore-able transitions.
|
// Ignore-able transitions.
|
||||||
.addTransition(
|
.addTransition(
|
||||||
TaskState.SUCCEEDED, TaskState.SUCCEEDED,
|
TaskState.SUCCEEDED, TaskState.SUCCEEDED,
|
||||||
EnumSet.of(TaskEventType.T_KILL,
|
EnumSet.of(TaskEventType.T_ADD_SPEC_ATTEMPT,
|
||||||
TaskEventType.T_ADD_SPEC_ATTEMPT,
|
TaskEventType.T_ATTEMPT_LAUNCHED))
|
||||||
TaskEventType.T_ATTEMPT_LAUNCHED,
|
|
||||||
TaskEventType.T_ATTEMPT_KILLED))
|
|
||||||
|
|
||||||
// Transitions from FAILED state
|
// Transitions from FAILED state
|
||||||
.addTransition(TaskState.FAILED, TaskState.FAILED,
|
.addTransition(TaskState.FAILED, TaskState.FAILED,
|
||||||
|
@ -629,7 +630,6 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
||||||
// always called inside a transition, in turn inside the Write Lock
|
// always called inside a transition, in turn inside the Write Lock
|
||||||
private void handleTaskAttemptCompletion(TaskAttemptId attemptId,
|
private void handleTaskAttemptCompletion(TaskAttemptId attemptId,
|
||||||
TaskAttemptCompletionEventStatus status) {
|
TaskAttemptCompletionEventStatus status) {
|
||||||
finishedAttempts++;
|
|
||||||
TaskAttempt attempt = attempts.get(attemptId);
|
TaskAttempt attempt = attempts.get(attemptId);
|
||||||
//raise the completion event only if the container is assigned
|
//raise the completion event only if the container is assigned
|
||||||
// to nextAttemptNumber
|
// to nextAttemptNumber
|
||||||
|
@ -682,6 +682,11 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
||||||
return taskFailedEvent;
|
return taskFailedEvent;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void unSucceed(TaskImpl task) {
|
||||||
|
task.commitAttempt = null;
|
||||||
|
task.successfulAttempt = null;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return a String representation of the splits.
|
* @return a String representation of the splits.
|
||||||
*
|
*
|
||||||
|
@ -755,6 +760,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
||||||
task.handleTaskAttemptCompletion(
|
task.handleTaskAttemptCompletion(
|
||||||
((TaskTAttemptEvent) event).getTaskAttemptID(),
|
((TaskTAttemptEvent) event).getTaskAttemptID(),
|
||||||
TaskAttemptCompletionEventStatus.SUCCEEDED);
|
TaskAttemptCompletionEventStatus.SUCCEEDED);
|
||||||
|
task.finishedAttempts++;
|
||||||
--task.numberUncompletedAttempts;
|
--task.numberUncompletedAttempts;
|
||||||
task.successfulAttempt = ((TaskTAttemptEvent) event).getTaskAttemptID();
|
task.successfulAttempt = ((TaskTAttemptEvent) event).getTaskAttemptID();
|
||||||
task.eventHandler.handle(new JobTaskEvent(
|
task.eventHandler.handle(new JobTaskEvent(
|
||||||
|
@ -790,6 +796,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
||||||
task.handleTaskAttemptCompletion(
|
task.handleTaskAttemptCompletion(
|
||||||
((TaskTAttemptEvent) event).getTaskAttemptID(),
|
((TaskTAttemptEvent) event).getTaskAttemptID(),
|
||||||
TaskAttemptCompletionEventStatus.KILLED);
|
TaskAttemptCompletionEventStatus.KILLED);
|
||||||
|
task.finishedAttempts++;
|
||||||
--task.numberUncompletedAttempts;
|
--task.numberUncompletedAttempts;
|
||||||
if (task.successfulAttempt == null) {
|
if (task.successfulAttempt == null) {
|
||||||
task.addAndScheduleAttempt();
|
task.addAndScheduleAttempt();
|
||||||
|
@ -808,6 +815,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
||||||
task.handleTaskAttemptCompletion(
|
task.handleTaskAttemptCompletion(
|
||||||
((TaskTAttemptEvent) event).getTaskAttemptID(),
|
((TaskTAttemptEvent) event).getTaskAttemptID(),
|
||||||
TaskAttemptCompletionEventStatus.KILLED);
|
TaskAttemptCompletionEventStatus.KILLED);
|
||||||
|
task.finishedAttempts++;
|
||||||
// check whether all attempts are finished
|
// check whether all attempts are finished
|
||||||
if (task.finishedAttempts == task.attempts.size()) {
|
if (task.finishedAttempts == task.attempts.size()) {
|
||||||
if (task.historyTaskStartGenerated) {
|
if (task.historyTaskStartGenerated) {
|
||||||
|
@ -845,6 +853,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
||||||
attempt.getAssignedContainerMgrAddress()));
|
attempt.getAssignedContainerMgrAddress()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
task.finishedAttempts++;
|
||||||
if (task.failedAttempts < task.maxAttempts) {
|
if (task.failedAttempts < task.maxAttempts) {
|
||||||
task.handleTaskAttemptCompletion(
|
task.handleTaskAttemptCompletion(
|
||||||
((TaskTAttemptEvent) event).getTaskAttemptID(),
|
((TaskTAttemptEvent) event).getTaskAttemptID(),
|
||||||
|
@ -880,12 +889,6 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
||||||
protected TaskState getDefaultState(Task task) {
|
protected TaskState getDefaultState(Task task) {
|
||||||
return task.getState();
|
return task.getState();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void unSucceed(TaskImpl task) {
|
|
||||||
++task.numberUncompletedAttempts;
|
|
||||||
task.commitAttempt = null;
|
|
||||||
task.successfulAttempt = null;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class MapRetroactiveFailureTransition
|
private static class MapRetroactiveFailureTransition
|
||||||
|
@ -908,6 +911,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
||||||
// fails, we have to let AttemptFailedTransition.transition
|
// fails, we have to let AttemptFailedTransition.transition
|
||||||
// believe that there's no redundancy.
|
// believe that there's no redundancy.
|
||||||
unSucceed(task);
|
unSucceed(task);
|
||||||
|
// fake increase in Uncomplete attempts for super.transition
|
||||||
|
++task.numberUncompletedAttempts;
|
||||||
return super.transition(task, event);
|
return super.transition(task, event);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -917,6 +922,45 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class MapRetroactiveKilledTransition implements
|
||||||
|
MultipleArcTransition<TaskImpl, TaskEvent, TaskState> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TaskState transition(TaskImpl task, TaskEvent event) {
|
||||||
|
// verify that this occurs only for map task
|
||||||
|
// TODO: consider moving it to MapTaskImpl
|
||||||
|
if (!TaskType.MAP.equals(task.getType())) {
|
||||||
|
LOG.error("Unexpected event for REDUCE task " + event.getType());
|
||||||
|
task.internalError(event.getType());
|
||||||
|
}
|
||||||
|
|
||||||
|
TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
|
||||||
|
TaskAttemptId attemptId = attemptEvent.getTaskAttemptID();
|
||||||
|
if(task.successfulAttempt == attemptId) {
|
||||||
|
// successful attempt is now killed. reschedule
|
||||||
|
// tell the job about the rescheduling
|
||||||
|
unSucceed(task);
|
||||||
|
task.handleTaskAttemptCompletion(
|
||||||
|
attemptId,
|
||||||
|
TaskAttemptCompletionEventStatus.KILLED);
|
||||||
|
task.eventHandler.handle(new JobMapTaskRescheduledEvent(task.taskId));
|
||||||
|
// typically we are here because this map task was run on a bad node and
|
||||||
|
// we want to reschedule it on a different node.
|
||||||
|
// Depending on whether there are previous failed attempts or not this
|
||||||
|
// can SCHEDULE or RESCHEDULE the container allocate request. If this
|
||||||
|
// SCHEDULE's then the dataLocal hosts of this taskAttempt will be used
|
||||||
|
// from the map splitInfo. So the bad node might be sent as a location
|
||||||
|
// to the RM. But the RM would ignore that just like it would ignore
|
||||||
|
// currently pending container requests affinitized to bad nodes.
|
||||||
|
task.addAndScheduleAttempt();
|
||||||
|
return TaskState.SCHEDULED;
|
||||||
|
} else {
|
||||||
|
// nothing to do
|
||||||
|
return TaskState.SUCCEEDED;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static class KillNewTransition
|
private static class KillNewTransition
|
||||||
implements SingleArcTransition<TaskImpl, TaskEvent> {
|
implements SingleArcTransition<TaskImpl, TaskEvent> {
|
||||||
@Override
|
@Override
|
||||||
|
@ -966,6 +1010,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
||||||
public void transition(TaskImpl task, TaskEvent event) {
|
public void transition(TaskImpl task, TaskEvent event) {
|
||||||
task.metrics.launchedTask(task);
|
task.metrics.launchedTask(task);
|
||||||
task.metrics.runningTask(task);
|
task.metrics.runningTask(task);
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.app.rm;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
@ -46,19 +47,27 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
|
||||||
import org.apache.hadoop.yarn.YarnException;
|
import org.apache.hadoop.yarn.YarnException;
|
||||||
import org.apache.hadoop.yarn.api.records.AMResponse;
|
import org.apache.hadoop.yarn.api.records.AMResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.util.RackResolver;
|
import org.apache.hadoop.yarn.util.RackResolver;
|
||||||
|
@ -584,6 +593,8 @@ public class RMContainerAllocator extends RMContainerRequestor
|
||||||
//Called on each allocation. Will know about newly blacklisted/added hosts.
|
//Called on each allocation. Will know about newly blacklisted/added hosts.
|
||||||
computeIgnoreBlacklisting();
|
computeIgnoreBlacklisting();
|
||||||
|
|
||||||
|
handleUpdatedNodes(response);
|
||||||
|
|
||||||
for (ContainerStatus cont : finishedContainers) {
|
for (ContainerStatus cont : finishedContainers) {
|
||||||
LOG.info("Received completed container " + cont.getContainerId());
|
LOG.info("Received completed container " + cont.getContainerId());
|
||||||
TaskAttemptId attemptID = assignedRequests.get(cont.getContainerId());
|
TaskAttemptId attemptID = assignedRequests.get(cont.getContainerId());
|
||||||
|
@ -605,6 +616,44 @@ public class RMContainerAllocator extends RMContainerRequestor
|
||||||
return newContainers;
|
return newContainers;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private void handleUpdatedNodes(AMResponse response) {
|
||||||
|
// send event to the job about on updated nodes
|
||||||
|
List<NodeReport> updatedNodes = response.getUpdatedNodes();
|
||||||
|
if (!updatedNodes.isEmpty()) {
|
||||||
|
|
||||||
|
// send event to the job to act upon completed tasks
|
||||||
|
eventHandler.handle(new JobUpdatedNodesEvent(getJob().getID(),
|
||||||
|
updatedNodes));
|
||||||
|
|
||||||
|
// act upon running tasks
|
||||||
|
HashSet<NodeId> unusableNodes = new HashSet<NodeId>();
|
||||||
|
for (NodeReport nr : updatedNodes) {
|
||||||
|
NodeState nodeState = nr.getNodeState();
|
||||||
|
if (nodeState.isUnusable()) {
|
||||||
|
unusableNodes.add(nr.getNodeId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (int i = 0; i < 2; ++i) {
|
||||||
|
HashMap<TaskAttemptId, Container> taskSet = i == 0 ? assignedRequests.maps
|
||||||
|
: assignedRequests.reduces;
|
||||||
|
// kill running containers
|
||||||
|
for (Map.Entry<TaskAttemptId, Container> entry : taskSet.entrySet()) {
|
||||||
|
TaskAttemptId tid = entry.getKey();
|
||||||
|
NodeId taskAttemptNodeId = entry.getValue().getNodeId();
|
||||||
|
if (unusableNodes.contains(taskAttemptNodeId)) {
|
||||||
|
LOG.info("Killing taskAttempt:" + tid
|
||||||
|
+ " because it is running on unusable node:"
|
||||||
|
+ taskAttemptNodeId);
|
||||||
|
eventHandler.handle(new TaskAttemptKillEvent(tid,
|
||||||
|
"TaskAttempt killed because it ran on unusable node"
|
||||||
|
+ taskAttemptNodeId));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public int getMemLimit() {
|
public int getMemLimit() {
|
||||||
int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;
|
int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;
|
||||||
|
@ -743,7 +792,6 @@ public class RMContainerAllocator extends RMContainerRequestor
|
||||||
boolean blackListed = false;
|
boolean blackListed = false;
|
||||||
ContainerRequest assigned = null;
|
ContainerRequest assigned = null;
|
||||||
|
|
||||||
ContainerId allocatedContainerId = allocated.getId();
|
|
||||||
if (isAssignable) {
|
if (isAssignable) {
|
||||||
// do not assign if allocated container is on a
|
// do not assign if allocated container is on a
|
||||||
// blacklisted host
|
// blacklisted host
|
||||||
|
@ -790,7 +838,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
||||||
eventHandler.handle(new TaskAttemptContainerAssignedEvent(
|
eventHandler.handle(new TaskAttemptContainerAssignedEvent(
|
||||||
assigned.attemptID, allocated, applicationACLs));
|
assigned.attemptID, allocated, applicationACLs));
|
||||||
|
|
||||||
assignedRequests.add(allocatedContainerId, assigned.attemptID);
|
assignedRequests.add(allocated, assigned.attemptID);
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.info("Assigned container (" + allocated + ") "
|
LOG.info("Assigned container (" + allocated + ") "
|
||||||
|
@ -811,7 +859,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
||||||
// or if we could not assign it
|
// or if we could not assign it
|
||||||
if (blackListed || assigned == null) {
|
if (blackListed || assigned == null) {
|
||||||
containersReleased++;
|
containersReleased++;
|
||||||
release(allocatedContainerId);
|
release(allocated.getId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -974,20 +1022,20 @@ public class RMContainerAllocator extends RMContainerRequestor
|
||||||
private class AssignedRequests {
|
private class AssignedRequests {
|
||||||
private final Map<ContainerId, TaskAttemptId> containerToAttemptMap =
|
private final Map<ContainerId, TaskAttemptId> containerToAttemptMap =
|
||||||
new HashMap<ContainerId, TaskAttemptId>();
|
new HashMap<ContainerId, TaskAttemptId>();
|
||||||
private final LinkedHashMap<TaskAttemptId, ContainerId> maps =
|
private final LinkedHashMap<TaskAttemptId, Container> maps =
|
||||||
new LinkedHashMap<TaskAttemptId, ContainerId>();
|
new LinkedHashMap<TaskAttemptId, Container>();
|
||||||
private final LinkedHashMap<TaskAttemptId, ContainerId> reduces =
|
private final LinkedHashMap<TaskAttemptId, Container> reduces =
|
||||||
new LinkedHashMap<TaskAttemptId, ContainerId>();
|
new LinkedHashMap<TaskAttemptId, Container>();
|
||||||
private final Set<TaskAttemptId> preemptionWaitingReduces =
|
private final Set<TaskAttemptId> preemptionWaitingReduces =
|
||||||
new HashSet<TaskAttemptId>();
|
new HashSet<TaskAttemptId>();
|
||||||
|
|
||||||
void add(ContainerId containerId, TaskAttemptId tId) {
|
void add(Container container, TaskAttemptId tId) {
|
||||||
LOG.info("Assigned container " + containerId.toString() + " to " + tId);
|
LOG.info("Assigned container " + container.getId().toString() + " to " + tId);
|
||||||
containerToAttemptMap.put(containerId, tId);
|
containerToAttemptMap.put(container.getId(), tId);
|
||||||
if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) {
|
if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) {
|
||||||
maps.put(tId, containerId);
|
maps.put(tId, container);
|
||||||
} else {
|
} else {
|
||||||
reduces.put(tId, containerId);
|
reduces.put(tId, container);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1017,9 +1065,9 @@ public class RMContainerAllocator extends RMContainerRequestor
|
||||||
boolean remove(TaskAttemptId tId) {
|
boolean remove(TaskAttemptId tId) {
|
||||||
ContainerId containerId = null;
|
ContainerId containerId = null;
|
||||||
if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) {
|
if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) {
|
||||||
containerId = maps.remove(tId);
|
containerId = maps.remove(tId).getId();
|
||||||
} else {
|
} else {
|
||||||
containerId = reduces.remove(tId);
|
containerId = reduces.remove(tId).getId();
|
||||||
if (containerId != null) {
|
if (containerId != null) {
|
||||||
boolean preempted = preemptionWaitingReduces.remove(tId);
|
boolean preempted = preemptionWaitingReduces.remove(tId);
|
||||||
if (preempted) {
|
if (preempted) {
|
||||||
|
@ -1039,11 +1087,19 @@ public class RMContainerAllocator extends RMContainerRequestor
|
||||||
return containerToAttemptMap.get(cId);
|
return containerToAttemptMap.get(cId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
NodeId getNodeId(TaskAttemptId tId) {
|
||||||
|
if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) {
|
||||||
|
return maps.get(tId).getNodeId();
|
||||||
|
} else {
|
||||||
|
return reduces.get(tId).getNodeId();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
ContainerId get(TaskAttemptId tId) {
|
ContainerId get(TaskAttemptId tId) {
|
||||||
if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) {
|
if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) {
|
||||||
return maps.get(tId);
|
return maps.get(tId).getId();
|
||||||
} else {
|
} else {
|
||||||
return reduces.get(tId);
|
return reduces.get(tId).getId();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.MockApps;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
|
||||||
|
@ -231,6 +232,11 @@ public class MockJobs extends MockApps {
|
||||||
final List<String> diags = Lists.newArrayList();
|
final List<String> diags = Lists.newArrayList();
|
||||||
diags.add(DIAGS.next());
|
diags.add(DIAGS.next());
|
||||||
return new TaskAttempt() {
|
return new TaskAttempt() {
|
||||||
|
@Override
|
||||||
|
public NodeId getNodeId() throws UnsupportedOperationException{
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TaskAttemptId getID() {
|
public TaskAttemptId getID() {
|
||||||
return taid;
|
return taid;
|
||||||
|
|
|
@ -22,6 +22,7 @@ import static org.mockito.Mockito.spy;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
|
@ -29,17 +30,26 @@ import junit.framework.Assert;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||||
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
||||||
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
|
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||||
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -161,6 +171,159 @@ public class TestMRApp {
|
||||||
app.waitForState(job, JobState.SUCCEEDED);
|
app.waitForState(job, JobState.SUCCEEDED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The test verifies that the AM re-runs maps that have run on bad nodes. It
|
||||||
|
* also verifies that the AM records all success/killed events so that reduces
|
||||||
|
* are notified about map output status changes. It also verifies that the
|
||||||
|
* re-run information is preserved across AM restart
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testUpdatedNodes() throws Exception {
|
||||||
|
int runCount = 0;
|
||||||
|
MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(),
|
||||||
|
true, ++runCount);
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
// after half of the map completion, reduce will start
|
||||||
|
conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.5f);
|
||||||
|
// uberization forces full slowstart (1.0), so disable that
|
||||||
|
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
||||||
|
Job job = app.submit(conf);
|
||||||
|
app.waitForState(job, JobState.RUNNING);
|
||||||
|
Assert.assertEquals("Num tasks not correct", 3, job.getTasks().size());
|
||||||
|
Iterator<Task> it = job.getTasks().values().iterator();
|
||||||
|
Task mapTask1 = it.next();
|
||||||
|
Task mapTask2 = it.next();
|
||||||
|
|
||||||
|
// all maps must be running
|
||||||
|
app.waitForState(mapTask1, TaskState.RUNNING);
|
||||||
|
app.waitForState(mapTask2, TaskState.RUNNING);
|
||||||
|
|
||||||
|
TaskAttempt task1Attempt = mapTask1.getAttempts().values().iterator()
|
||||||
|
.next();
|
||||||
|
TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator()
|
||||||
|
.next();
|
||||||
|
NodeId node1 = task1Attempt.getNodeId();
|
||||||
|
NodeId node2 = task2Attempt.getNodeId();
|
||||||
|
Assert.assertEquals(node1, node2);
|
||||||
|
|
||||||
|
// send the done signal to the task
|
||||||
|
app.getContext()
|
||||||
|
.getEventHandler()
|
||||||
|
.handle(
|
||||||
|
new TaskAttemptEvent(task1Attempt.getID(),
|
||||||
|
TaskAttemptEventType.TA_DONE));
|
||||||
|
app.getContext()
|
||||||
|
.getEventHandler()
|
||||||
|
.handle(
|
||||||
|
new TaskAttemptEvent(task2Attempt.getID(),
|
||||||
|
TaskAttemptEventType.TA_DONE));
|
||||||
|
|
||||||
|
// all maps must be succeeded
|
||||||
|
app.waitForState(mapTask1, TaskState.SUCCEEDED);
|
||||||
|
app.waitForState(mapTask2, TaskState.SUCCEEDED);
|
||||||
|
|
||||||
|
TaskAttemptCompletionEvent[] events = job.getTaskAttemptCompletionEvents(0,
|
||||||
|
100);
|
||||||
|
Assert.assertEquals("Expecting 2 completion events for success", 2,
|
||||||
|
events.length);
|
||||||
|
|
||||||
|
// send updated nodes info
|
||||||
|
ArrayList<NodeReport> updatedNodes = new ArrayList<NodeReport>();
|
||||||
|
NodeReport nr = RecordFactoryProvider.getRecordFactory(null)
|
||||||
|
.newRecordInstance(NodeReport.class);
|
||||||
|
nr.setNodeId(node1);
|
||||||
|
nr.setNodeState(NodeState.UNHEALTHY);
|
||||||
|
updatedNodes.add(nr);
|
||||||
|
app.getContext().getEventHandler()
|
||||||
|
.handle(new JobUpdatedNodesEvent(job.getID(), updatedNodes));
|
||||||
|
|
||||||
|
app.waitForState(task1Attempt, TaskAttemptState.KILLED);
|
||||||
|
app.waitForState(task2Attempt, TaskAttemptState.KILLED);
|
||||||
|
|
||||||
|
events = job.getTaskAttemptCompletionEvents(0, 100);
|
||||||
|
Assert.assertEquals("Expecting 2 more completion events for killed", 4,
|
||||||
|
events.length);
|
||||||
|
|
||||||
|
// all maps must be back to running
|
||||||
|
app.waitForState(mapTask1, TaskState.RUNNING);
|
||||||
|
app.waitForState(mapTask2, TaskState.RUNNING);
|
||||||
|
|
||||||
|
Iterator<TaskAttempt> itr = mapTask1.getAttempts().values().iterator();
|
||||||
|
itr.next();
|
||||||
|
task1Attempt = itr.next();
|
||||||
|
|
||||||
|
// send the done signal to the task
|
||||||
|
app.getContext()
|
||||||
|
.getEventHandler()
|
||||||
|
.handle(
|
||||||
|
new TaskAttemptEvent(task1Attempt.getID(),
|
||||||
|
TaskAttemptEventType.TA_DONE));
|
||||||
|
|
||||||
|
// map1 must be succeeded. map2 must be running
|
||||||
|
app.waitForState(mapTask1, TaskState.SUCCEEDED);
|
||||||
|
app.waitForState(mapTask2, TaskState.RUNNING);
|
||||||
|
|
||||||
|
events = job.getTaskAttemptCompletionEvents(0, 100);
|
||||||
|
Assert.assertEquals("Expecting 1 more completion events for success", 5,
|
||||||
|
events.length);
|
||||||
|
|
||||||
|
// Crash the app again.
|
||||||
|
app.stop();
|
||||||
|
|
||||||
|
// rerun
|
||||||
|
// in rerun the 1st map will be recovered from previous run
|
||||||
|
app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
|
||||||
|
++runCount);
|
||||||
|
conf = new Configuration();
|
||||||
|
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
|
||||||
|
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
||||||
|
job = app.submit(conf);
|
||||||
|
app.waitForState(job, JobState.RUNNING);
|
||||||
|
Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
|
||||||
|
it = job.getTasks().values().iterator();
|
||||||
|
mapTask1 = it.next();
|
||||||
|
mapTask2 = it.next();
|
||||||
|
Task reduceTask = it.next();
|
||||||
|
|
||||||
|
// map 1 will be recovered, no need to send done
|
||||||
|
app.waitForState(mapTask1, TaskState.SUCCEEDED);
|
||||||
|
app.waitForState(mapTask2, TaskState.RUNNING);
|
||||||
|
|
||||||
|
events = job.getTaskAttemptCompletionEvents(0, 100);
|
||||||
|
Assert.assertEquals(
|
||||||
|
"Expecting 2 completion events for killed & success of map1", 2,
|
||||||
|
events.length);
|
||||||
|
|
||||||
|
task2Attempt = mapTask2.getAttempts().values().iterator().next();
|
||||||
|
app.getContext()
|
||||||
|
.getEventHandler()
|
||||||
|
.handle(
|
||||||
|
new TaskAttemptEvent(task2Attempt.getID(),
|
||||||
|
TaskAttemptEventType.TA_DONE));
|
||||||
|
app.waitForState(mapTask2, TaskState.SUCCEEDED);
|
||||||
|
|
||||||
|
events = job.getTaskAttemptCompletionEvents(0, 100);
|
||||||
|
Assert.assertEquals("Expecting 1 more completion events for success", 3,
|
||||||
|
events.length);
|
||||||
|
|
||||||
|
app.waitForState(reduceTask, TaskState.RUNNING);
|
||||||
|
TaskAttempt task3Attempt = reduceTask.getAttempts().values().iterator()
|
||||||
|
.next();
|
||||||
|
app.getContext()
|
||||||
|
.getEventHandler()
|
||||||
|
.handle(
|
||||||
|
new TaskAttemptEvent(task3Attempt.getID(),
|
||||||
|
TaskAttemptEventType.TA_DONE));
|
||||||
|
app.waitForState(reduceTask, TaskState.SUCCEEDED);
|
||||||
|
|
||||||
|
events = job.getTaskAttemptCompletionEvents(0, 100);
|
||||||
|
Assert.assertEquals("Expecting 1 more completion events for success", 4,
|
||||||
|
events.length);
|
||||||
|
|
||||||
|
// job succeeds
|
||||||
|
app.waitForState(job, JobState.SUCCEEDED);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testJobError() throws Exception {
|
public void testJobError() throws Exception {
|
||||||
MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true);
|
MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true);
|
||||||
|
@ -194,10 +357,6 @@ public class TestMRApp {
|
||||||
((AppContext) getContext()).getAllJobs().put(spiedJob.getID(), spiedJob);
|
((AppContext) getContext()).getAllJobs().put(spiedJob.getID(), spiedJob);
|
||||||
return spiedJob;
|
return spiedJob;
|
||||||
}
|
}
|
||||||
|
|
||||||
JobImpl getSpiedJob() {
|
|
||||||
return this.spiedJob;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -233,6 +392,21 @@ public class TestMRApp {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private final class MRAppWithHistory extends MRApp {
|
||||||
|
public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
|
||||||
|
String testName, boolean cleanOnStart, int startCount) {
|
||||||
|
super(maps, reduces, autoComplete, testName, cleanOnStart, startCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
|
||||||
|
AppContext context) {
|
||||||
|
JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(context,
|
||||||
|
getStartCount());
|
||||||
|
return eventHandler;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
TestMRApp t = new TestMRApp();
|
TestMRApp t = new TestMRApp();
|
||||||
t.testMapReduce();
|
t.testMapReduce();
|
||||||
|
|
|
@ -46,9 +46,11 @@ import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
|
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
|
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
|
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
|
||||||
|
@ -595,6 +597,88 @@ public class TestRMContainerAllocator {
|
||||||
Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f);
|
Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdatedNodes() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
MyResourceManager rm = new MyResourceManager(conf);
|
||||||
|
rm.start();
|
||||||
|
DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
|
||||||
|
.getDispatcher();
|
||||||
|
|
||||||
|
// Submit the application
|
||||||
|
RMApp app = rm.submitApp(1024);
|
||||||
|
dispatcher.await();
|
||||||
|
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
|
||||||
|
amNodeManager.nodeHeartbeat(true);
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
|
||||||
|
.getAppAttemptId();
|
||||||
|
rm.sendAMLaunched(appAttemptId);
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
|
||||||
|
Job mockJob = mock(Job.class);
|
||||||
|
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
|
||||||
|
appAttemptId, mockJob);
|
||||||
|
|
||||||
|
// add resources to scheduler
|
||||||
|
MockNM nm1 = rm.registerNode("h1:1234", 10240);
|
||||||
|
MockNM nm2 = rm.registerNode("h2:1234", 10240);
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
// create the map container request
|
||||||
|
ContainerRequestEvent event = createReq(jobId, 1, 1024,
|
||||||
|
new String[] { "h1" });
|
||||||
|
allocator.sendRequest(event);
|
||||||
|
TaskAttemptId attemptId = event.getAttemptID();
|
||||||
|
|
||||||
|
TaskAttempt mockTaskAttempt = mock(TaskAttempt.class);
|
||||||
|
when(mockTaskAttempt.getNodeId()).thenReturn(nm1.getNodeId());
|
||||||
|
Task mockTask = mock(Task.class);
|
||||||
|
when(mockTask.getAttempt(attemptId)).thenReturn(mockTaskAttempt);
|
||||||
|
when(mockJob.getTask(attemptId.getTaskId())).thenReturn(mockTask);
|
||||||
|
|
||||||
|
// this tells the scheduler about the requests
|
||||||
|
List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
dispatcher.await();
|
||||||
|
// get the assignment
|
||||||
|
assigned = allocator.schedule();
|
||||||
|
dispatcher.await();
|
||||||
|
Assert.assertEquals(1, assigned.size());
|
||||||
|
Assert.assertEquals(nm1.getNodeId(), assigned.get(0).getContainer().getNodeId());
|
||||||
|
// no updated nodes reported
|
||||||
|
Assert.assertTrue(allocator.getJobUpdatedNodeEvents().isEmpty());
|
||||||
|
Assert.assertTrue(allocator.getTaskAttemptKillEvents().isEmpty());
|
||||||
|
|
||||||
|
// mark nodes bad
|
||||||
|
nm1.nodeHeartbeat(false);
|
||||||
|
nm2.nodeHeartbeat(false);
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
// schedule response returns updated nodes
|
||||||
|
assigned = allocator.schedule();
|
||||||
|
dispatcher.await();
|
||||||
|
Assert.assertEquals(0, assigned.size());
|
||||||
|
// updated nodes are reported
|
||||||
|
Assert.assertEquals(1, allocator.getJobUpdatedNodeEvents().size());
|
||||||
|
Assert.assertEquals(1, allocator.getTaskAttemptKillEvents().size());
|
||||||
|
Assert.assertEquals(2, allocator.getJobUpdatedNodeEvents().get(0).getUpdatedNodes().size());
|
||||||
|
Assert.assertEquals(attemptId, allocator.getTaskAttemptKillEvents().get(0).getTaskAttemptID());
|
||||||
|
allocator.getJobUpdatedNodeEvents().clear();
|
||||||
|
allocator.getTaskAttemptKillEvents().clear();
|
||||||
|
|
||||||
|
assigned = allocator.schedule();
|
||||||
|
dispatcher.await();
|
||||||
|
Assert.assertEquals(0, assigned.size());
|
||||||
|
// no updated nodes reported
|
||||||
|
Assert.assertTrue(allocator.getJobUpdatedNodeEvents().isEmpty());
|
||||||
|
Assert.assertTrue(allocator.getTaskAttemptKillEvents().isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBlackListedNodes() throws Exception {
|
public void testBlackListedNodes() throws Exception {
|
||||||
|
|
||||||
|
@ -1100,7 +1184,10 @@ public class TestRMContainerAllocator {
|
||||||
private static class MyContainerAllocator extends RMContainerAllocator {
|
private static class MyContainerAllocator extends RMContainerAllocator {
|
||||||
static final List<TaskAttemptContainerAssignedEvent> events
|
static final List<TaskAttemptContainerAssignedEvent> events
|
||||||
= new ArrayList<TaskAttemptContainerAssignedEvent>();
|
= new ArrayList<TaskAttemptContainerAssignedEvent>();
|
||||||
|
static final List<TaskAttemptKillEvent> taskAttemptKillEvents
|
||||||
|
= new ArrayList<TaskAttemptKillEvent>();
|
||||||
|
static final List<JobUpdatedNodesEvent> jobUpdatedNodeEvents
|
||||||
|
= new ArrayList<JobUpdatedNodesEvent>();
|
||||||
private MyResourceManager rm;
|
private MyResourceManager rm;
|
||||||
|
|
||||||
private static AppContext createAppContext(
|
private static AppContext createAppContext(
|
||||||
|
@ -1119,6 +1206,10 @@ public class TestRMContainerAllocator {
|
||||||
// Only capture interesting events.
|
// Only capture interesting events.
|
||||||
if (event instanceof TaskAttemptContainerAssignedEvent) {
|
if (event instanceof TaskAttemptContainerAssignedEvent) {
|
||||||
events.add((TaskAttemptContainerAssignedEvent) event);
|
events.add((TaskAttemptContainerAssignedEvent) event);
|
||||||
|
} else if (event instanceof TaskAttemptKillEvent) {
|
||||||
|
taskAttemptKillEvents.add((TaskAttemptKillEvent)event);
|
||||||
|
} else if (event instanceof JobUpdatedNodesEvent) {
|
||||||
|
jobUpdatedNodeEvents.add((JobUpdatedNodesEvent)event);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -1203,6 +1294,14 @@ public class TestRMContainerAllocator {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
List<TaskAttemptKillEvent> getTaskAttemptKillEvents() {
|
||||||
|
return taskAttemptKillEvents;
|
||||||
|
}
|
||||||
|
|
||||||
|
List<JobUpdatedNodesEvent> getJobUpdatedNodeEvents() {
|
||||||
|
return jobUpdatedNodeEvents;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void startAllocatorThread() {
|
protected void startAllocatorThread() {
|
||||||
// override to NOT start thread
|
// override to NOT start thread
|
||||||
|
|
|
@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.SystemClock;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
|
@ -528,6 +529,11 @@ public class TestRuntimeEstimators {
|
||||||
dispatcher.getEventHandler().handle(event);
|
dispatcher.getEventHandler().handle(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public NodeId getNodeId() throws UnsupportedOperationException{
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TaskAttemptId getID() {
|
public TaskAttemptId getID() {
|
||||||
return myAttemptID;
|
return myAttemptID;
|
||||||
|
|
|
@ -282,9 +282,12 @@ public class JobHistoryParser {
|
||||||
if(attemptInfo.getAttemptId().equals(taskInfo.getSuccessfulAttemptId()))
|
if(attemptInfo.getAttemptId().equals(taskInfo.getSuccessfulAttemptId()))
|
||||||
{
|
{
|
||||||
// the failed attempt is the one that made this task successful
|
// the failed attempt is the one that made this task successful
|
||||||
// so its no longer successful
|
// so its no longer successful. Reset fields set in
|
||||||
|
// handleTaskFinishedEvent()
|
||||||
|
taskInfo.counters = null;
|
||||||
|
taskInfo.finishTime = -1;
|
||||||
taskInfo.status = null;
|
taskInfo.status = null;
|
||||||
// not resetting the other fields set in handleTaskFinishedEvent()
|
taskInfo.successfulAttemptId = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
|
||||||
public class CompletedTaskAttempt implements TaskAttempt {
|
public class CompletedTaskAttempt implements TaskAttempt {
|
||||||
|
@ -57,6 +58,11 @@ public class CompletedTaskAttempt implements TaskAttempt {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public NodeId getNodeId() throws UnsupportedOperationException{
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ContainerId getAssignedContainerID() {
|
public ContainerId getAssignedContainerID() {
|
||||||
return attemptInfo.getContainerId();
|
return attemptInfo.getContainerId();
|
||||||
|
|
|
@ -38,5 +38,9 @@ public enum NodeState {
|
||||||
LOST,
|
LOST,
|
||||||
|
|
||||||
/** Node has rebooted */
|
/** Node has rebooted */
|
||||||
REBOOTED
|
REBOOTED;
|
||||||
|
|
||||||
|
public boolean isUnusable() {
|
||||||
|
return (this == UNHEALTHY || this == DECOMMISSIONED || this == LOST);
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue