MAPREDUCE-3125. Modified TaskImpl to consider only non-failed, non-killed task-attempts for obtaining task's progress. Contributed by Hitesh Shah.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1182230 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-10-12 06:51:32 +00:00
parent c3792b4b3f
commit d6546fc0a4
3 changed files with 413 additions and 2 deletions

View File

@ -1585,6 +1585,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3140. Fixed the invalid JobHistory URL for failed MAPREDUCE-3140. Fixed the invalid JobHistory URL for failed
applications. (Subroto Sanyal via vinodkv) applications. (Subroto Sanyal via vinodkv)
MAPREDUCE-3125. Modified TaskImpl to consider only non-failed, non-killed
task-attempts for obtaining task's progress. (Hitesh Shah via vinodkv)
Release 0.22.0 - Unreleased Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -441,10 +441,20 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
float progress = 0f; float progress = 0f;
TaskAttempt result = null; TaskAttempt result = null;
for (TaskAttempt at : attempts.values()) { for (TaskAttempt at : attempts.values()) {
switch (at.getState()) {
// ignore all failed task attempts
case FAIL_CONTAINER_CLEANUP:
case FAIL_TASK_CLEANUP:
case FAILED:
case KILL_CONTAINER_CLEANUP:
case KILL_TASK_CLEANUP:
case KILLED:
continue;
}
if (result == null) { if (result == null) {
result = at; //The first time around result = at; //The first time around
} }
//TODO: consider the nextAttemptNumber only if it is not failed/killed ?
// calculate the best progress // calculate the best progress
if (at.getProgress() > progress) { if (at.getProgress() > progress) {
result = at; result = at;
@ -496,7 +506,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
break; break;
case 1: case 1:
Map newAttempts Map<TaskAttemptId, TaskAttempt> newAttempts
= new LinkedHashMap<TaskAttemptId, TaskAttempt>(maxAttempts); = new LinkedHashMap<TaskAttemptId, TaskAttempt>(maxAttempts);
newAttempts.putAll(attempts); newAttempts.putAll(attempts);
attempts = newAttempts; attempts = newAttempts;

View File

@ -0,0 +1,398 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl;
import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestTaskImpl {
private static final Log LOG = LogFactory.getLog(TestTaskImpl.class);
private Configuration conf;
private TaskAttemptListener taskAttemptListener;
private OutputCommitter committer;
private Token<JobTokenIdentifier> jobToken;
private JobId jobId;
private Path remoteJobConfFile;
private Collection<Token<? extends TokenIdentifier>> fsTokens;
private Clock clock;
private Set<TaskId> completedTasksFromPreviousRun;
private MRAppMetrics metrics;
private TaskImpl mockTask;
private ApplicationId appId;
private TaskSplitMetaInfo taskSplitMetaInfo;
private String[] dataLocations = new String[0];
private final TaskType taskType = TaskType.MAP;
private int startCount = 0;
private int taskCounter = 0;
private final int partition = 1;
private InlineDispatcher dispatcher;
private List<MockTaskAttemptImpl> taskAttempts;
private class MockTaskImpl extends TaskImpl {
private int taskAttemptCounter = 0;
@SuppressWarnings("rawtypes")
public MockTaskImpl(JobId jobId, int partition,
EventHandler eventHandler, Path remoteJobConfFile, Configuration conf,
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
Set<TaskId> completedTasksFromPreviousRun, int startCount,
MRAppMetrics metrics) {
super(jobId, taskType , partition, eventHandler,
remoteJobConfFile, conf, taskAttemptListener, committer,
jobToken, fsTokens, clock,
completedTasksFromPreviousRun, startCount, metrics);
}
@Override
public TaskType getType() {
return taskType;
}
@Override
protected TaskAttemptImpl createAttempt() {
MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getID(), ++taskAttemptCounter,
eventHandler, taskAttemptListener, remoteJobConfFile, partition,
conf, committer, jobToken, fsTokens, clock);
taskAttempts.add(attempt);
return attempt;
}
@Override
protected int getMaxAttempts() {
return 100;
}
}
private class MockTaskAttemptImpl extends TaskAttemptImpl {
private float progress = 0;
private TaskAttemptState state = TaskAttemptState.NEW;
private TaskAttemptId attemptId;
@SuppressWarnings("rawtypes")
public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
Configuration conf, OutputCommitter committer,
Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock) {
super(taskId, id, eventHandler, taskAttemptListener, jobFile, partition, conf,
dataLocations, committer, jobToken, fsTokens, clock);
attemptId = Records.newRecord(TaskAttemptId.class);
attemptId.setId(id);
attemptId.setTaskId(taskId);
}
public TaskAttemptId getAttemptId() {
return attemptId;
}
@Override
protected Task createRemoteTask() {
return new MockTask();
}
public float getProgress() {
return progress ;
}
public void setProgress(float progress) {
this.progress = progress;
}
public void setState(TaskAttemptState state) {
this.state = state;
}
public TaskAttemptState getState() {
return state;
}
}
private class MockTask extends Task {
@Override
@SuppressWarnings("deprecation")
public void run(JobConf job, TaskUmbilicalProtocol umbilical)
throws IOException, ClassNotFoundException, InterruptedException {
return;
}
@Override
public boolean isMapTask() {
return true;
}
}
@Before
@SuppressWarnings("unchecked")
public void setup() {
dispatcher = new InlineDispatcher();
++startCount;
conf = new Configuration();
taskAttemptListener = mock(TaskAttemptListener.class);
committer = mock(OutputCommitter.class);
jobToken = (Token<JobTokenIdentifier>) mock(Token.class);
remoteJobConfFile = mock(Path.class);
fsTokens = null;
clock = new SystemClock();
metrics = mock(MRAppMetrics.class);
dataLocations = new String[1];
appId = Records.newRecord(ApplicationId.class);
appId.setClusterTimestamp(System.currentTimeMillis());
appId.setId(1);
jobId = Records.newRecord(JobId.class);
jobId.setId(1);
jobId.setAppId(appId);
taskSplitMetaInfo = mock(TaskSplitMetaInfo.class);
when(taskSplitMetaInfo.getLocations()).thenReturn(dataLocations);
taskAttempts = new ArrayList<MockTaskAttemptImpl>();
mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
remoteJobConfFile, conf, taskAttemptListener, committer, jobToken,
fsTokens, clock,
completedTasksFromPreviousRun, startCount,
metrics);
}
@After
public void teardown() {
taskAttempts.clear();
}
private TaskId getNewTaskID() {
TaskId taskId = Records.newRecord(TaskId.class);
taskId.setId(++taskCounter);
taskId.setJobId(jobId);
taskId.setTaskType(mockTask.getType());
return taskId;
}
private void scheduleTaskAttempt(TaskId taskId) {
mockTask.handle(new TaskEvent(taskId,
TaskEventType.T_SCHEDULE));
assertTaskScheduledState();
}
private void killTask(TaskId taskId) {
mockTask.handle(new TaskEvent(taskId,
TaskEventType.T_KILL));
assertTaskKillWaitState();
}
private void killScheduledTaskAttempt(TaskAttemptId attemptId) {
mockTask.handle(new TaskTAttemptEvent(attemptId,
TaskEventType.T_ATTEMPT_KILLED));
assertTaskScheduledState();
}
private void launchTaskAttempt(TaskAttemptId attemptId) {
mockTask.handle(new TaskTAttemptEvent(attemptId,
TaskEventType.T_ATTEMPT_LAUNCHED));
assertTaskRunningState();
}
private MockTaskAttemptImpl getLastAttempt() {
return taskAttempts.get(taskAttempts.size()-1);
}
private void updateLastAttemptProgress(float p) {
getLastAttempt().setProgress(p);
}
private void updateLastAttemptState(TaskAttemptState s) {
getLastAttempt().setState(s);
}
private void killRunningTaskAttempt(TaskAttemptId attemptId) {
mockTask.handle(new TaskTAttemptEvent(attemptId,
TaskEventType.T_ATTEMPT_KILLED));
assertTaskRunningState();
}
/**
* {@link TaskState#NEW}
*/
private void assertTaskNewState() {
assertEquals(mockTask.getState(), TaskState.NEW);
}
/**
* {@link TaskState#SCHEDULED}
*/
private void assertTaskScheduledState() {
assertEquals(mockTask.getState(), TaskState.SCHEDULED);
}
/**
* {@link TaskState#RUNNING}
*/
private void assertTaskRunningState() {
assertEquals(mockTask.getState(), TaskState.RUNNING);
}
/**
* {@link TaskState#KILL_WAIT}
*/
private void assertTaskKillWaitState() {
assertEquals(mockTask.getState(), TaskState.KILL_WAIT);
}
@Test
public void testInit() {
LOG.info("--- START: testInit ---");
assertTaskNewState();
assert(taskAttempts.size() == 0);
}
@Test
/**
* {@link TaskState#NEW}->{@link TaskState#SCHEDULED}
*/
public void testScheduleTask() {
LOG.info("--- START: testScheduleTask ---");
TaskId taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
}
@Test
/**
* {@link TaskState#SCHEDULED}->{@link TaskState#KILL_WAIT}
*/
public void testKillScheduledTask() {
LOG.info("--- START: testKillScheduledTask ---");
TaskId taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
killTask(taskId);
}
@Test
/**
* Kill attempt
* {@link TaskState#SCHEDULED}->{@link TaskState#SCHEDULED}
*/
public void testKillScheduledTaskAttempt() {
LOG.info("--- START: testKillScheduledTaskAttempt ---");
TaskId taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
killScheduledTaskAttempt(getLastAttempt().getAttemptId());
}
@Test
/**
* Launch attempt
* {@link TaskState#SCHEDULED}->{@link TaskState#RUNNING}
*/
public void testLaunchTaskAttempt() {
LOG.info("--- START: testLaunchTaskAttempt ---");
TaskId taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
launchTaskAttempt(getLastAttempt().getAttemptId());
}
@Test
/**
* Kill running attempt
* {@link TaskState#RUNNING}->{@link TaskState#RUNNING}
*/
public void testKillRunningTaskAttempt() {
LOG.info("--- START: testKillRunningTaskAttempt ---");
TaskId taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
launchTaskAttempt(getLastAttempt().getAttemptId());
killRunningTaskAttempt(getLastAttempt().getAttemptId());
}
@Test
public void testTaskProgress() {
LOG.info("--- START: testTaskProgress ---");
// launch task
TaskId taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
float progress = 0f;
assert(mockTask.getProgress() == progress);
launchTaskAttempt(getLastAttempt().getAttemptId());
// update attempt1
progress = 50f;
updateLastAttemptProgress(progress);
assert(mockTask.getProgress() == progress);
progress = 100f;
updateLastAttemptProgress(progress);
assert(mockTask.getProgress() == progress);
progress = 0f;
// mark first attempt as killed
updateLastAttemptState(TaskAttemptState.KILLED);
assert(mockTask.getProgress() == progress);
// kill first attempt
// should trigger a new attempt
// as no successful attempts
killRunningTaskAttempt(getLastAttempt().getAttemptId());
assert(taskAttempts.size() == 2);
assert(mockTask.getProgress() == 0f);
launchTaskAttempt(getLastAttempt().getAttemptId());
progress = 50f;
updateLastAttemptProgress(progress);
assert(mockTask.getProgress() == progress);
}
}