diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index d01f7b24980..6005cb14f82 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -608,6 +608,9 @@ Release 0.23.7 - UNRELEASED MAPREDUCE-4794. DefaultSpeculator generates error messages on normal shutdown (Jason Lowe via jeagles) + MAPREDUCE-5043. Fetch failure processing can cause AM event queue to + backup and eventually OOM (Jason Lowe via bobby) + Release 0.23.6 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java index af4aef7d9c0..1ca9dffb66d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java @@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.app.job; import java.util.List; import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.v2.api.records.Phase; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; @@ -37,6 +38,7 @@ public interface TaskAttempt { List getDiagnostics(); Counters getCounters(); float getProgress(); + Phase getPhase(); TaskAttemptState getState(); /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index ef09a396056..3b0804f0c2f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -1672,6 +1672,20 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, SingleArcTransition { @Override public void transition(JobImpl job, JobEvent event) { + //get number of shuffling reduces + int shufflingReduceTasks = 0; + for (TaskId taskId : job.reduceTasks) { + Task task = job.tasks.get(taskId); + if (TaskState.RUNNING.equals(task.getState())) { + for(TaskAttempt attempt : task.getAttempts().values()) { + if(attempt.getPhase() == Phase.SHUFFLE) { + shufflingReduceTasks++; + break; + } + } + } + } + JobTaskAttemptFetchFailureEvent fetchfailureEvent = (JobTaskAttemptFetchFailureEvent) event; for (org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId mapId : @@ -1680,20 +1694,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1); job.fetchFailuresMapping.put(mapId, fetchFailures); - //get number of shuffling reduces - int shufflingReduceTasks = 0; - for (TaskId taskId : job.reduceTasks) { - Task task = job.tasks.get(taskId); - if (TaskState.RUNNING.equals(task.getState())) { - for(TaskAttempt attempt : task.getAttempts().values()) { - if(attempt.getReport().getPhase() == Phase.SHUFFLE) { - shufflingReduceTasks++; - break; - } - } - } - } - float failureRate = shufflingReduceTasks == 0 ? 1.0f : (float) fetchFailures / shufflingReduceTasks; // declare faulty if fetch-failures >= max-allowed-failures diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 62f00b95c0e..2eb04d3873d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -993,6 +993,16 @@ public abstract class TaskAttemptImpl implements } } + @Override + public Phase getPhase() { + readLock.lock(); + try { + return reportedStatus.phase; + } finally { + readLock.unlock(); + } + } + @Override public TaskAttemptState getState() { readLock.lock(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java index 5bab5cd3518..c1cddb8673f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java @@ -276,6 +276,11 @@ public class MockJobs extends MockApps { return report.getProgress(); } + @Override + public Phase getPhase() { + return report.getPhase(); + } + @Override public TaskAttemptState getState() { return report.getTaskAttemptState(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java index 2ddab831275..0e20d6f384a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java @@ -39,6 +39,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.api.records.Phase; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport; @@ -638,6 +639,11 @@ public class TestRuntimeEstimators { return myAttemptID.getTaskId().getTaskType() == TaskType.MAP ? getMapProgress() : getReduceProgress(); } + @Override + public Phase getPhase() { + throw new UnsupportedOperationException("Not supported yet."); + } + @Override public TaskAttemptState getState() { if (overridingState != null) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java index 8f8e77615cd..0aa2e0bece4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; +import org.apache.hadoop.mapreduce.v2.api.records.Phase; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; @@ -106,6 +107,11 @@ public class CompletedTaskAttempt implements TaskAttempt { return report; } + @Override + public Phase getPhase() { + return Phase.CLEANUP; + } + @Override public TaskAttemptState getState() { return state;