MAPREDUCE-5000. Fixes getCounters when speculating by fixing the selection of the best attempt for a task. Contributed by Jason Lowe.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1445871 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
666667464a
commit
43204f0c83
|
@ -169,6 +169,9 @@ Release 2.0.4-beta - UNRELEASED
|
||||||
|
|
||||||
MAPREDUCE-4994. -jt generic command line option does not work. (sandyr via tucu)
|
MAPREDUCE-4994. -jt generic command line option does not work. (sandyr via tucu)
|
||||||
|
|
||||||
|
MAPREDUCE-5000. Fixes getCounters when speculating by fixing the selection
|
||||||
|
of the best attempt for a task. (Jason Lowe via sseth)
|
||||||
|
|
||||||
Release 2.0.3-alpha - 2013-02-06
|
Release 2.0.3-alpha - 2013-02-06
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -539,6 +539,10 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
||||||
//select the nextAttemptNumber with best progress
|
//select the nextAttemptNumber with best progress
|
||||||
// always called inside the Read Lock
|
// always called inside the Read Lock
|
||||||
private TaskAttempt selectBestAttempt() {
|
private TaskAttempt selectBestAttempt() {
|
||||||
|
if (successfulAttempt != null) {
|
||||||
|
return attempts.get(successfulAttempt);
|
||||||
|
}
|
||||||
|
|
||||||
float progress = 0f;
|
float progress = 0f;
|
||||||
TaskAttempt result = null;
|
TaskAttempt result = null;
|
||||||
for (TaskAttempt at : attempts.values()) {
|
for (TaskAttempt at : attempts.values()) {
|
||||||
|
|
|
@ -35,6 +35,9 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.mapred.JobConf;
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
import org.apache.hadoop.mapred.Task;
|
import org.apache.hadoop.mapred.Task;
|
||||||
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
|
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
|
||||||
|
import org.apache.hadoop.mapreduce.Counter;
|
||||||
|
import org.apache.hadoop.mapreduce.Counters;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskCounter;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
|
||||||
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
|
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
|
||||||
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
|
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
|
||||||
|
@ -52,7 +55,6 @@ import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
|
|
||||||
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
|
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
@ -143,6 +145,7 @@ public class TestTaskImpl {
|
||||||
private float progress = 0;
|
private float progress = 0;
|
||||||
private TaskAttemptState state = TaskAttemptState.NEW;
|
private TaskAttemptState state = TaskAttemptState.NEW;
|
||||||
private TaskType taskType;
|
private TaskType taskType;
|
||||||
|
private Counters attemptCounters = TaskAttemptImpl.EMPTY_COUNTERS;
|
||||||
|
|
||||||
public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler,
|
public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler,
|
||||||
TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
|
TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
|
||||||
|
@ -178,7 +181,15 @@ public class TestTaskImpl {
|
||||||
public TaskAttemptState getState() {
|
public TaskAttemptState getState() {
|
||||||
return state;
|
return state;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Counters getCounters() {
|
||||||
|
return attemptCounters;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCounters(Counters counters) {
|
||||||
|
attemptCounters = counters;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private class MockTask extends Task {
|
private class MockTask extends Task {
|
||||||
|
@ -687,4 +698,49 @@ public class TestTaskImpl {
|
||||||
TaskEventType.T_ATTEMPT_KILLED));
|
TaskEventType.T_ATTEMPT_KILLED));
|
||||||
assertEquals(TaskState.FAILED, mockTask.getState());
|
assertEquals(TaskState.FAILED, mockTask.getState());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCountersWithSpeculation() {
|
||||||
|
mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
|
||||||
|
remoteJobConfFile, conf, taskAttemptListener, jobToken,
|
||||||
|
credentials, clock,
|
||||||
|
completedTasksFromPreviousRun, startCount,
|
||||||
|
metrics, appContext, TaskType.MAP) {
|
||||||
|
@Override
|
||||||
|
protected int getMaxAttempts() {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
TaskId taskId = getNewTaskID();
|
||||||
|
scheduleTaskAttempt(taskId);
|
||||||
|
launchTaskAttempt(getLastAttempt().getAttemptId());
|
||||||
|
updateLastAttemptState(TaskAttemptState.RUNNING);
|
||||||
|
MockTaskAttemptImpl baseAttempt = getLastAttempt();
|
||||||
|
|
||||||
|
// add a speculative attempt
|
||||||
|
mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
|
||||||
|
TaskEventType.T_ADD_SPEC_ATTEMPT));
|
||||||
|
launchTaskAttempt(getLastAttempt().getAttemptId());
|
||||||
|
updateLastAttemptState(TaskAttemptState.RUNNING);
|
||||||
|
MockTaskAttemptImpl specAttempt = getLastAttempt();
|
||||||
|
assertEquals(2, taskAttempts.size());
|
||||||
|
|
||||||
|
Counters specAttemptCounters = new Counters();
|
||||||
|
Counter cpuCounter = specAttemptCounters.findCounter(
|
||||||
|
TaskCounter.CPU_MILLISECONDS);
|
||||||
|
cpuCounter.setValue(1000);
|
||||||
|
specAttempt.setCounters(specAttemptCounters);
|
||||||
|
|
||||||
|
// have the spec attempt succeed but second attempt at 1.0 progress as well
|
||||||
|
commitTaskAttempt(specAttempt.getAttemptId());
|
||||||
|
specAttempt.setProgress(1.0f);
|
||||||
|
specAttempt.setState(TaskAttemptState.SUCCEEDED);
|
||||||
|
mockTask.handle(new TaskTAttemptEvent(specAttempt.getAttemptId(),
|
||||||
|
TaskEventType.T_ATTEMPT_SUCCEEDED));
|
||||||
|
assertEquals(TaskState.SUCCEEDED, mockTask.getState());
|
||||||
|
baseAttempt.setProgress(1.0f);
|
||||||
|
|
||||||
|
Counters taskCounters = mockTask.getCounters();
|
||||||
|
assertEquals("wrong counters for task", specAttemptCounters, taskCounters);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue