From 43204f0c836ed35270bdac99bf3bafdb78ae6b57 Mon Sep 17 00:00:00 2001 From: Siddharth Seth Date: Wed, 13 Feb 2013 19:20:31 +0000 Subject: [PATCH] MAPREDUCE-5000. Fixes getCounters when speculating by fixing the selection of the best attempt for a task. Contributed by Jason Lowe. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1445871 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../mapreduce/v2/app/job/impl/TaskImpl.java | 4 ++ .../v2/app/job/impl/TestTaskImpl.java | 60 ++++++++++++++++++- 3 files changed, 65 insertions(+), 2 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 8ce787dcb27..12ea8aaf432 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -169,6 +169,9 @@ Release 2.0.4-beta - UNRELEASED MAPREDUCE-4994. -jt generic command line option does not work. (sandyr via tucu) + MAPREDUCE-5000. Fixes getCounters when speculating by fixing the selection + of the best attempt for a task. (Jason Lowe via sseth) + Release 2.0.3-alpha - 2013-02-06 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java index 28950e96cc5..c45197e1579 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java @@ -539,6 +539,10 @@ public abstract class TaskImpl implements Task, EventHandler { //select the nextAttemptNumber with best progress // always called inside the Read Lock private TaskAttempt selectBestAttempt() { + if (successfulAttempt != null) { + return attempts.get(successfulAttempt); + } + float progress = 0f; TaskAttempt result = null; for (TaskAttempt at : attempts.values()) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java index c0cfc072284..3f03dc3fd32 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java @@ -35,6 +35,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Task; import org.apache.hadoop.mapred.TaskUmbilicalProtocol; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.TaskCounter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; @@ -52,7 +55,6 @@ import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; -import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; @@ -143,6 +145,7 @@ public class TestTaskImpl { private float progress = 0; private TaskAttemptState state = TaskAttemptState.NEW; private TaskType taskType; + private Counters attemptCounters = TaskAttemptImpl.EMPTY_COUNTERS; public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler, TaskAttemptListener taskAttemptListener, Path jobFile, int partition, @@ -178,7 +181,15 @@ public class TestTaskImpl { public TaskAttemptState getState() { return state; } - + + @Override + public Counters getCounters() { + return attemptCounters; + } + + public void setCounters(Counters counters) { + attemptCounters = counters; + } } private class MockTask extends Task { @@ -687,4 +698,49 @@ public class TestTaskImpl { TaskEventType.T_ATTEMPT_KILLED)); assertEquals(TaskState.FAILED, mockTask.getState()); } + + @Test + public void testCountersWithSpeculation() { + mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(), + remoteJobConfFile, conf, taskAttemptListener, jobToken, + credentials, clock, + completedTasksFromPreviousRun, startCount, + metrics, appContext, TaskType.MAP) { + @Override + protected int getMaxAttempts() { + return 1; + } + }; + TaskId taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + launchTaskAttempt(getLastAttempt().getAttemptId()); + updateLastAttemptState(TaskAttemptState.RUNNING); + MockTaskAttemptImpl baseAttempt = getLastAttempt(); + + // add a speculative attempt + mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), + TaskEventType.T_ADD_SPEC_ATTEMPT)); + launchTaskAttempt(getLastAttempt().getAttemptId()); + updateLastAttemptState(TaskAttemptState.RUNNING); + MockTaskAttemptImpl specAttempt = getLastAttempt(); + assertEquals(2, taskAttempts.size()); + + Counters specAttemptCounters = new Counters(); + Counter cpuCounter = specAttemptCounters.findCounter( + TaskCounter.CPU_MILLISECONDS); + cpuCounter.setValue(1000); + specAttempt.setCounters(specAttemptCounters); + + // have the spec attempt succeed but second attempt at 1.0 progress as well + commitTaskAttempt(specAttempt.getAttemptId()); + specAttempt.setProgress(1.0f); + specAttempt.setState(TaskAttemptState.SUCCEEDED); + mockTask.handle(new TaskTAttemptEvent(specAttempt.getAttemptId(), + TaskEventType.T_ATTEMPT_SUCCEEDED)); + assertEquals(TaskState.SUCCEEDED, mockTask.getState()); + baseAttempt.setProgress(1.0f); + + Counters taskCounters = mockTask.getCounters(); + assertEquals("wrong counters for task", specAttemptCounters, taskCounters); + } }