merge MAPREDUCE-5000 from trunk. Fixes getCounters when speculating by fixing the selection of the best attempt for a task. Contributed by Jason Lowe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1445873 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Siddharth Seth 2013-02-13 19:23:50 +00:00
parent 208238887c
commit d53d4cbca3
3 changed files with 65 additions and 2 deletions

View File

@ -17,6 +17,9 @@ Release 2.0.4-beta - UNRELEASED
MAPREDUCE-4994. -jt generic command line option does not work. (sandyr via tucu)
MAPREDUCE-5000. Fixes getCounters when speculating by fixing the selection
of the best attempt for a task. (Jason Lowe via sseth)
Release 2.0.3-alpha - 2013-02-06
INCOMPATIBLE CHANGES

View File

@ -539,6 +539,10 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
//select the nextAttemptNumber with best progress
// always called inside the Read Lock
private TaskAttempt selectBestAttempt() {
if (successfulAttempt != null) {
return attempts.get(successfulAttempt);
}
float progress = 0f;
TaskAttempt result = null;
for (TaskAttempt at : attempts.values()) {

View File

@ -35,6 +35,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@ -52,7 +55,6 @@ import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
@ -143,6 +145,7 @@ public class TestTaskImpl {
private float progress = 0;
private TaskAttemptState state = TaskAttemptState.NEW;
private TaskType taskType;
private Counters attemptCounters = TaskAttemptImpl.EMPTY_COUNTERS;
public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
@ -178,7 +181,15 @@ public class TestTaskImpl {
public TaskAttemptState getState() {
return state;
}
@Override
public Counters getCounters() {
return attemptCounters;
}
public void setCounters(Counters counters) {
attemptCounters = counters;
}
}
private class MockTask extends Task {
@ -687,4 +698,49 @@ public class TestTaskImpl {
TaskEventType.T_ATTEMPT_KILLED));
assertEquals(TaskState.FAILED, mockTask.getState());
}
@Test
public void testCountersWithSpeculation() {
mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
remoteJobConfFile, conf, taskAttemptListener, jobToken,
credentials, clock,
completedTasksFromPreviousRun, startCount,
metrics, appContext, TaskType.MAP) {
@Override
protected int getMaxAttempts() {
return 1;
}
};
TaskId taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
launchTaskAttempt(getLastAttempt().getAttemptId());
updateLastAttemptState(TaskAttemptState.RUNNING);
MockTaskAttemptImpl baseAttempt = getLastAttempt();
// add a speculative attempt
mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
TaskEventType.T_ADD_SPEC_ATTEMPT));
launchTaskAttempt(getLastAttempt().getAttemptId());
updateLastAttemptState(TaskAttemptState.RUNNING);
MockTaskAttemptImpl specAttempt = getLastAttempt();
assertEquals(2, taskAttempts.size());
Counters specAttemptCounters = new Counters();
Counter cpuCounter = specAttemptCounters.findCounter(
TaskCounter.CPU_MILLISECONDS);
cpuCounter.setValue(1000);
specAttempt.setCounters(specAttemptCounters);
// have the spec attempt succeed but second attempt at 1.0 progress as well
commitTaskAttempt(specAttempt.getAttemptId());
specAttempt.setProgress(1.0f);
specAttempt.setState(TaskAttemptState.SUCCEEDED);
mockTask.handle(new TaskTAttemptEvent(specAttempt.getAttemptId(),
TaskEventType.T_ATTEMPT_SUCCEEDED));
assertEquals(TaskState.SUCCEEDED, mockTask.getState());
baseAttempt.setProgress(1.0f);
Counters taskCounters = mockTask.getCounters();
assertEquals("wrong counters for task", specAttemptCounters, taskCounters);
}
}