svn merge -c 1452372 FIXES: MAPREDUCE-5043. Fetch failure processing can cause AM event queue to backup and eventually OOM (Jason Lowe via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1452451 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2013-03-04 19:06:26 +00:00
parent b57b5cec27
commit 611d6d543e
7 changed files with 46 additions and 14 deletions

View File

@ -608,6 +608,9 @@ Release 0.23.7 - UNRELEASED
MAPREDUCE-4794. DefaultSpeculator generates error messages on normal MAPREDUCE-4794. DefaultSpeculator generates error messages on normal
shutdown (Jason Lowe via jeagles) shutdown (Jason Lowe via jeagles)
MAPREDUCE-5043. Fetch failure processing can cause AM event queue to
backup and eventually OOM (Jason Lowe via bobby)
Release 0.23.6 - UNRELEASED Release 0.23.6 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -21,6 +21,7 @@
import java.util.List; import java.util.List;
import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
@ -37,6 +38,7 @@ public interface TaskAttempt {
List<String> getDiagnostics(); List<String> getDiagnostics();
Counters getCounters(); Counters getCounters();
float getProgress(); float getProgress();
Phase getPhase();
TaskAttemptState getState(); TaskAttemptState getState();
/** /**

View File

@ -1672,6 +1672,20 @@ private static class TaskAttemptFetchFailureTransition implements
SingleArcTransition<JobImpl, JobEvent> { SingleArcTransition<JobImpl, JobEvent> {
@Override @Override
public void transition(JobImpl job, JobEvent event) { public void transition(JobImpl job, JobEvent event) {
//get number of shuffling reduces
int shufflingReduceTasks = 0;
for (TaskId taskId : job.reduceTasks) {
Task task = job.tasks.get(taskId);
if (TaskState.RUNNING.equals(task.getState())) {
for(TaskAttempt attempt : task.getAttempts().values()) {
if(attempt.getPhase() == Phase.SHUFFLE) {
shufflingReduceTasks++;
break;
}
}
}
}
JobTaskAttemptFetchFailureEvent fetchfailureEvent = JobTaskAttemptFetchFailureEvent fetchfailureEvent =
(JobTaskAttemptFetchFailureEvent) event; (JobTaskAttemptFetchFailureEvent) event;
for (org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId mapId : for (org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId mapId :
@ -1680,20 +1694,6 @@ public void transition(JobImpl job, JobEvent event) {
fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1); fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1);
job.fetchFailuresMapping.put(mapId, fetchFailures); job.fetchFailuresMapping.put(mapId, fetchFailures);
//get number of shuffling reduces
int shufflingReduceTasks = 0;
for (TaskId taskId : job.reduceTasks) {
Task task = job.tasks.get(taskId);
if (TaskState.RUNNING.equals(task.getState())) {
for(TaskAttempt attempt : task.getAttempts().values()) {
if(attempt.getReport().getPhase() == Phase.SHUFFLE) {
shufflingReduceTasks++;
break;
}
}
}
}
float failureRate = shufflingReduceTasks == 0 ? 1.0f : float failureRate = shufflingReduceTasks == 0 ? 1.0f :
(float) fetchFailures / shufflingReduceTasks; (float) fetchFailures / shufflingReduceTasks;
// declare faulty if fetch-failures >= max-allowed-failures // declare faulty if fetch-failures >= max-allowed-failures

View File

@ -993,6 +993,16 @@ public float getProgress() {
} }
} }
@Override
public Phase getPhase() {
readLock.lock();
try {
return reportedStatus.phase;
} finally {
readLock.unlock();
}
}
@Override @Override
public TaskAttemptState getState() { public TaskAttemptState getState() {
readLock.lock(); readLock.lock();

View File

@ -276,6 +276,11 @@ public float getProgress() {
return report.getProgress(); return report.getProgress();
} }
@Override
public Phase getPhase() {
return report.getPhase();
}
@Override @Override
public TaskAttemptState getState() { public TaskAttemptState getState() {
return report.getTaskAttemptState(); return report.getTaskAttemptState();

View File

@ -39,6 +39,7 @@
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
@ -638,6 +639,11 @@ public float getProgress() {
return myAttemptID.getTaskId().getTaskType() == TaskType.MAP ? getMapProgress() : getReduceProgress(); return myAttemptID.getTaskId().getTaskType() == TaskType.MAP ? getMapProgress() : getReduceProgress();
} }
@Override
public Phase getPhase() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override @Override
public TaskAttemptState getState() { public TaskAttemptState getState() {
if (overridingState != null) { if (overridingState != null) {

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
@ -106,6 +107,11 @@ public synchronized TaskAttemptReport getReport() {
return report; return report;
} }
@Override
public Phase getPhase() {
return Phase.CLEANUP;
}
@Override @Override
public TaskAttemptState getState() { public TaskAttemptState getState() {
return state; return state;