MAPREDUCE-5043. Fetch failure processing can cause AM event queue to backup and eventually OOM (Jason Lowe via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1452372 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2013-03-04 16:32:18 +00:00
parent 84567faa92
commit ec13f1eb3a
7 changed files with 46 additions and 14 deletions

View File

@ -757,6 +757,9 @@ Release 0.23.7 - UNRELEASED
MAPREDUCE-4794. DefaultSpeculator generates error messages on normal
shutdown (Jason Lowe via jeagles)
MAPREDUCE-5043. Fetch failure processing can cause AM event queue to
backup and eventually OOM (Jason Lowe via bobby)
Release 0.23.6 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -21,6 +21,7 @@
import java.util.List;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
@ -37,6 +38,7 @@ public interface TaskAttempt {
List<String> getDiagnostics();
Counters getCounters();
float getProgress();
Phase getPhase();
TaskAttemptState getState();
/**

View File

@ -1672,6 +1672,20 @@ private static class TaskAttemptFetchFailureTransition implements
SingleArcTransition<JobImpl, JobEvent> {
@Override
public void transition(JobImpl job, JobEvent event) {
//get number of shuffling reduces
int shufflingReduceTasks = 0;
for (TaskId taskId : job.reduceTasks) {
Task task = job.tasks.get(taskId);
if (TaskState.RUNNING.equals(task.getState())) {
for(TaskAttempt attempt : task.getAttempts().values()) {
if(attempt.getPhase() == Phase.SHUFFLE) {
shufflingReduceTasks++;
break;
}
}
}
}
JobTaskAttemptFetchFailureEvent fetchfailureEvent =
(JobTaskAttemptFetchFailureEvent) event;
for (org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId mapId :
@ -1680,20 +1694,6 @@ public void transition(JobImpl job, JobEvent event) {
fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1);
job.fetchFailuresMapping.put(mapId, fetchFailures);
//get number of shuffling reduces
int shufflingReduceTasks = 0;
for (TaskId taskId : job.reduceTasks) {
Task task = job.tasks.get(taskId);
if (TaskState.RUNNING.equals(task.getState())) {
for(TaskAttempt attempt : task.getAttempts().values()) {
if(attempt.getReport().getPhase() == Phase.SHUFFLE) {
shufflingReduceTasks++;
break;
}
}
}
}
float failureRate = shufflingReduceTasks == 0 ? 1.0f :
(float) fetchFailures / shufflingReduceTasks;
// declare faulty if fetch-failures >= max-allowed-failures

View File

@ -993,6 +993,16 @@ public float getProgress() {
}
}
@Override
public Phase getPhase() {
readLock.lock();
try {
return reportedStatus.phase;
} finally {
readLock.unlock();
}
}
@Override
public TaskAttemptState getState() {
readLock.lock();

View File

@ -276,6 +276,11 @@ public float getProgress() {
return report.getProgress();
}
@Override
public Phase getPhase() {
return report.getPhase();
}
@Override
public TaskAttemptState getState() {
return report.getTaskAttemptState();

View File

@ -39,6 +39,7 @@
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
@ -638,6 +639,11 @@ public float getProgress() {
return myAttemptID.getTaskId().getTaskType() == TaskType.MAP ? getMapProgress() : getReduceProgress();
}
@Override
public Phase getPhase() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public TaskAttemptState getState() {
if (overridingState != null) {

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
@ -106,6 +107,11 @@ public synchronized TaskAttemptReport getReport() {
return report;
}
@Override
public Phase getPhase() {
return Phase.CLEANUP;
}
@Override
public TaskAttemptState getState() {
return state;