MAPREDUCE-6485. Create a new task attempt with failed map task priority if in-progress attempts are unassigned. (Xianyin Xin via rohithsharmaks)

(cherry picked from commit 439f43ad3d)
This commit is contained in:
rohithsharmaks 2015-10-02 20:34:34 +05:30
parent 966f9508c3
commit 7dd9ebdebc
4 changed files with 117 additions and 7 deletions

View File

@ -319,6 +319,9 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6494. Permission issue when running archive-logs tool as MAPREDUCE-6494. Permission issue when running archive-logs tool as
different users (rkanter) different users (rkanter)
MAPREDUCE-6485. Create a new task attempt with failed map task priority
if in-progress attempts are unassigned. (Xianyin Xin via rohithsharmaks)
Release 2.7.2 - UNRELEASED Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -1349,7 +1349,7 @@ public abstract class TaskAttemptImpl implements
return attemptState; return attemptState;
} }
private static TaskAttemptState getExternalState( protected static TaskAttemptState getExternalState(
TaskAttemptStateInternal smState) { TaskAttemptStateInternal smState) {
switch (smState) { switch (smState) {
case ASSIGNED: case ASSIGNED:
@ -1381,6 +1381,11 @@ public abstract class TaskAttemptImpl implements
} }
} }
// check whether the attempt is assigned if container is not null
boolean isContainerAssigned() {
return container != null;
}
//always called in write lock //always called in write lock
private void setFinishTime() { private void setFinishTime() {
//set the finish time only if launch time is set //set the finish time only if launch time is set

View File

@ -1057,9 +1057,21 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
TaskAttemptCompletionEventStatus.FAILED); TaskAttemptCompletionEventStatus.FAILED);
// we don't need a new event if we already have a spare // we don't need a new event if we already have a spare
task.inProgressAttempts.remove(taskAttemptId); task.inProgressAttempts.remove(taskAttemptId);
if (task.inProgressAttempts.size() == 0 if (task.successfulAttempt == null) {
&& task.successfulAttempt == null) { boolean shouldAddNewAttempt = true;
task.addAndScheduleAttempt(Avataar.VIRGIN); if (task.inProgressAttempts.size() > 0) {
// if not all of the inProgressAttempts are hanging for resource
for (TaskAttemptId attemptId : task.inProgressAttempts) {
if (((TaskAttemptImpl) task.getAttempt(attemptId))
.isContainerAssigned()) {
shouldAddNewAttempt = false;
break;
}
}
}
if (shouldAddNewAttempt) {
task.addAndScheduleAttempt(Avataar.VIRGIN);
}
} }
} else { } else {
task.handleTaskAttemptCompletion( task.handleTaskAttemptCompletion(
@ -1080,7 +1092,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
taskFailedEvent)); taskFailedEvent));
} else { } else {
LOG.debug("Not generating HistoryFinish event since start event not" + LOG.debug("Not generating HistoryFinish event since start event not" +
" generated for task: " + task.getID()); " generated for task: " + task.getID());
} }
task.eventHandler.handle( task.eventHandler.handle(
new JobTaskEvent(task.taskId, TaskState.FAILED)); new JobTaskEvent(task.taskId, TaskState.FAILED));

View File

@ -50,6 +50,8 @@ import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
@ -57,6 +59,7 @@ import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.event.InlineDispatcher;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
@ -140,6 +143,8 @@ public class TestTaskImpl {
private float progress = 0; private float progress = 0;
private TaskAttemptState state = TaskAttemptState.NEW; private TaskAttemptState state = TaskAttemptState.NEW;
boolean rescheduled = false;
boolean containerAssigned = false;
private TaskType taskType; private TaskType taskType;
private Counters attemptCounters = TaskAttemptImpl.EMPTY_COUNTERS; private Counters attemptCounters = TaskAttemptImpl.EMPTY_COUNTERS;
@ -153,6 +158,15 @@ public class TestTaskImpl {
this.taskType = taskType; this.taskType = taskType;
} }
public void assignContainer() {
containerAssigned = true;
}
@Override
boolean isContainerAssigned() {
return containerAssigned;
}
public TaskAttemptId getAttemptId() { public TaskAttemptId getAttemptId() {
return getID(); return getID();
} }
@ -173,11 +187,20 @@ public class TestTaskImpl {
public void setState(TaskAttemptState state) { public void setState(TaskAttemptState state) {
this.state = state; this.state = state;
} }
@Override
public TaskAttemptState getState() { public TaskAttemptState getState() {
return state; return state;
} }
public boolean getRescheduled() {
return this.rescheduled;
}
public void setRescheduled(boolean rescheduled) {
this.rescheduled = rescheduled;
}
@Override @Override
public Counters getCounters() { public Counters getCounters() {
return attemptCounters; return attemptCounters;
@ -279,7 +302,9 @@ public class TestTaskImpl {
private void launchTaskAttempt(TaskAttemptId attemptId) { private void launchTaskAttempt(TaskAttemptId attemptId) {
mockTask.handle(new TaskTAttemptEvent(attemptId, mockTask.handle(new TaskTAttemptEvent(attemptId,
TaskEventType.T_ATTEMPT_LAUNCHED)); TaskEventType.T_ATTEMPT_LAUNCHED));
assertTaskRunningState(); ((MockTaskAttemptImpl)(mockTask.getAttempt(attemptId)))
.assignContainer();
assertTaskRunningState();
} }
private void commitTaskAttempt(TaskAttemptId attemptId) { private void commitTaskAttempt(TaskAttemptId attemptId) {
@ -708,6 +733,71 @@ public class TestTaskImpl {
assertEquals(TaskState.FAILED, mockTask.getState()); assertEquals(TaskState.FAILED, mockTask.getState());
} }
private class PartialAttemptEventHandler implements EventHandler {
@Override
public void handle(Event event) {
if (event instanceof TaskAttemptEvent)
if (event.getType() == TaskAttemptEventType.TA_RESCHEDULE) {
TaskAttempt attempt = mockTask.getAttempt(((TaskAttemptEvent) event).getTaskAttemptID());
((MockTaskAttemptImpl)attempt).setRescheduled(true);
}
}
}
@Test
public void testFailedTransitionWithHangingSpeculativeMap() {
mockTask = new MockTaskImpl(jobId, partition, new PartialAttemptEventHandler(),
remoteJobConfFile, conf, taskAttemptListener, jobToken,
credentials, clock, startCount, metrics, appContext, TaskType.MAP) {
@Override
protected int getMaxAttempts() {
return 4;
}
};
// start a new task, schedule and launch a new attempt
TaskId taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
launchTaskAttempt(getLastAttempt().getAttemptId());
// add a speculative attempt(#2), but not launch it
mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
TaskEventType.T_ADD_SPEC_ATTEMPT));
// have the first attempt(#1) fail, verify task still running since the
// max attempts is 4
MockTaskAttemptImpl taskAttempt = taskAttempts.get(0);
taskAttempt.setState(TaskAttemptState.FAILED);
mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
TaskEventType.T_ATTEMPT_FAILED));
assertEquals(TaskState.RUNNING, mockTask.getState());
// verify a new attempt(#3) added because the speculative attempt(#2)
// is hanging
assertEquals(3, taskAttempts.size());
// verify the speculative attempt(#2) is not a rescheduled attempt
assertEquals(false, taskAttempts.get(1).getRescheduled());
// verify the third attempt is a rescheduled attempt
assertEquals(true, taskAttempts.get(2).getRescheduled());
// now launch the latest attempt(#3) and set the internal state to running
launchTaskAttempt(getLastAttempt().getAttemptId());
// have the speculative attempt(#2) fail, verify task still since it
// hasn't reach the max attempts which is 4
MockTaskAttemptImpl taskAttempt1 = taskAttempts.get(1);
taskAttempt1.setState(TaskAttemptState.FAILED);
mockTask.handle(new TaskTAttemptEvent(taskAttempt1.getAttemptId(),
TaskEventType.T_ATTEMPT_FAILED));
assertEquals(TaskState.RUNNING, mockTask.getState());
// verify there's no new attempt added because of the running attempt(#3)
assertEquals(3, taskAttempts.size());
}
@Test @Test
public void testCountersWithSpeculation() { public void testCountersWithSpeculation() {
mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(), mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),