MAPREDUCE-7022. Fast fail rogue jobs based on task scratch dir size. Contributed by Johan Gustavsson

This commit is contained in:
Jason Lowe 2018-01-26 14:36:45 -06:00
parent 1b0f265db1
commit a37e7f0ad8
22 changed files with 397 additions and 71 deletions

View File

@ -510,7 +510,7 @@ public class LocalContainerLauncher extends AbstractService implements
String cause =
(tCause == null) ? throwable.getMessage() : StringUtils
.stringifyException(tCause);
umbilical.fatalError(classicAttemptID, cause);
umbilical.fatalError(classicAttemptID, cause, false);
}
throw new RuntimeException();
}

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
@ -281,7 +282,7 @@ public class TaskAttemptListenerImpl extends CompositeService
}
@Override
public void fatalError(TaskAttemptID taskAttemptID, String msg)
public void fatalError(TaskAttemptID taskAttemptID, String msg, boolean fastFail)
throws IOException {
// This happens only in Child and in the Task.
LOG.error("Task: " + taskAttemptID + " - exited : " + msg);
@ -294,7 +295,7 @@ public class TaskAttemptListenerImpl extends CompositeService
preemptionPolicy.handleFailedContainer(attemptID);
context.getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
new TaskAttemptFailEvent(attemptID, fastFail));
}
@Override
@ -312,7 +313,7 @@ public class TaskAttemptListenerImpl extends CompositeService
preemptionPolicy.handleFailedContainer(attemptID);
context.getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
new TaskAttemptFailEvent(attemptID));
}
@Override

View File

@ -206,7 +206,7 @@ class YarnChild {
if (taskid != null) {
if (!ShutdownHookManager.get().isShutdownInProgress()) {
umbilical.fatalError(taskid,
StringUtils.stringifyException(exception));
StringUtils.stringifyException(exception), false);
}
}
} catch (Throwable throwable) {
@ -218,7 +218,7 @@ class YarnChild {
String cause =
tCause == null ? throwable.getMessage() : StringUtils
.stringifyException(tCause);
umbilical.fatalError(taskid, cause);
umbilical.fatalError(taskid, cause, false);
}
}
} finally {

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.job.event;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
public class TaskAttemptFailEvent extends TaskAttemptEvent {
private boolean fastFail;
/**
* Create a new TaskAttemptFailEvent, with task fastFail disabled.
*
* @param id the id of the task attempt
*/
public TaskAttemptFailEvent(TaskAttemptId id) {
this(id, false);
}
/**
* Create a new TaskAttemptFailEvent.
*
* @param id the id of the task attempt
* @param fastFail should the task fastFail or not.
*/
public TaskAttemptFailEvent(TaskAttemptId id, boolean fastFail) {
super(id, TaskAttemptEventType.TA_FAILMSG);
this.fastFail = fastFail;
}
/**
* Check if task should fast fail or retry
* @return boolean value where true indicates the task should not retry
*/
public boolean isFastFail() {
return fastFail;
}
}

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.job.event;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
public class TaskTAttemptFailedEvent extends TaskTAttemptEvent {
private boolean fastFail;
public TaskTAttemptFailedEvent(TaskAttemptId id) {
this(id, false);
}
public TaskTAttemptFailedEvent(TaskAttemptId id, boolean fastFail) {
super(id, TaskEventType.T_ATTEMPT_FAILED);
this.fastFail = fastFail;
}
public boolean isFastFail() {
return fastFail;
}
}

View File

@ -94,6 +94,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunched
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
@ -101,6 +102,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptTooManyFetchFailureEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
@ -194,6 +196,7 @@ public abstract class TaskAttemptImpl implements
private Locality locality;
private Avataar avataar;
private boolean rescheduleNextAttempt = false;
private boolean failFast = false;
private static final CleanupContainerTransition
CLEANUP_CONTAINER_TRANSITION = new CleanupContainerTransition();
@ -1412,6 +1415,14 @@ public abstract class TaskAttemptImpl implements
public void setAvataar(Avataar avataar) {
this.avataar = avataar;
}
public void setTaskFailFast(boolean failFast) {
this.failFast = failFast;
}
public boolean isTaskFailFast() {
return failFast;
}
@SuppressWarnings("unchecked")
public TaskAttemptStateInternal recover(TaskAttemptInfo taInfo,
@ -1921,9 +1932,12 @@ public abstract class TaskAttemptImpl implements
switch(finalState) {
case FAILED:
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId,
TaskEventType.T_ATTEMPT_FAILED));
boolean fastFail = false;
if (event instanceof TaskAttemptFailEvent) {
fastFail = ((TaskAttemptFailEvent) event).isFastFail();
}
taskAttempt.eventHandler.handle(new TaskTAttemptFailedEvent(
taskAttempt.attemptId, fastFail));
break;
case KILLED:
taskAttempt.eventHandler.handle(new TaskTAttemptKilledEvent(
@ -2041,13 +2055,16 @@ public abstract class TaskAttemptImpl implements
private static class FailedTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings("unchecked")
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
// set the finish time
taskAttempt.setFinishTime();
notifyTaskAttemptFailed(taskAttempt);
notifyTaskAttemptFailed(taskAttempt, taskAttempt.isTaskFailFast());
}
}
@ -2154,8 +2171,8 @@ public abstract class TaskAttemptImpl implements
LOG.debug("Not generating HistoryFinish event since start event not " +
"generated for taskAttempt: " + taskAttempt.getID());
}
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
taskAttempt.eventHandler.handle(new TaskTAttemptFailedEvent(
taskAttempt.attemptId));
}
}
@ -2332,6 +2349,8 @@ public abstract class TaskAttemptImpl implements
if (event instanceof TaskAttemptKillEvent) {
taskAttempt.setRescheduleNextAttempt(
((TaskAttemptKillEvent)event).getRescheduleAttempt());
} else if (event instanceof TaskAttemptFailEvent) {
taskAttempt.setTaskFailFast(((TaskAttemptFailEvent)event).isFastFail());
}
}
}
@ -2400,12 +2419,13 @@ public abstract class TaskAttemptImpl implements
// register it to finishing state
taskAttempt.appContext.getTaskAttemptFinishingMonitor().register(
taskAttempt.attemptId);
notifyTaskAttemptFailed(taskAttempt);
notifyTaskAttemptFailed(taskAttempt, false);
}
}
@SuppressWarnings("unchecked")
private static void notifyTaskAttemptFailed(TaskAttemptImpl taskAttempt) {
private static void notifyTaskAttemptFailed(TaskAttemptImpl taskAttempt,
boolean fastFail) {
if (taskAttempt.getLaunchTime() == 0) {
sendJHStartEventForAssignedFailTask(taskAttempt);
}
@ -2419,8 +2439,8 @@ public abstract class TaskAttemptImpl implements
taskAttempt.eventHandler.handle(new JobHistoryEvent(
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
taskAttempt.eventHandler.handle(new TaskTAttemptFailedEvent(
taskAttempt.attemptId, fastFail));
}

View File

@ -74,6 +74,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
@ -1054,7 +1055,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
@Override
public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
TaskTAttemptFailedEvent castEvent = (TaskTAttemptFailedEvent) event;
TaskAttemptId taskAttemptId = castEvent.getTaskAttemptID();
task.failedAttempts.add(taskAttemptId);
if (taskAttemptId.equals(task.commitAttempt)) {
@ -1068,7 +1069,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
}
task.finishedAttempts.add(taskAttemptId);
if (task.failedAttempts.size() < task.maxAttempts) {
if (!castEvent.isFastFail()
&& task.failedAttempts.size() < task.maxAttempts) {
task.handleTaskAttemptCompletion(
taskAttemptId,
TaskAttemptCompletionEventStatus.FAILED);

View File

@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
@ -288,8 +289,7 @@ public class TestFail {
if (attemptID.getTaskId().getId() == 0) {//check if it is first task
// send the Fail event
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID,
TaskAttemptEventType.TA_FAILMSG));
new TaskAttemptFailEvent(attemptID));
} else {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID,
@ -310,8 +310,7 @@ public class TestFail {
//check if it is first task's first attempt
// send the Fail event
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID,
TaskAttemptEventType.TA_FAILMSG));
new TaskAttemptFailEvent(attemptID));
} else {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID,

View File

@ -38,6 +38,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
@ -167,9 +169,8 @@ public class TestRecovery {
/////////// Play some games with the TaskAttempts of the first task //////
//send the fail signal to the 1st map task attempt
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
task1Attempt1.getID(),
TaskAttemptEventType.TA_FAILMSG));
new TaskAttemptFailEvent(
task1Attempt1.getID()));
app.waitForState(task1Attempt1, TaskAttemptState.FAILED);

View File

@ -81,7 +81,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
@ -437,8 +437,7 @@ public class TestJobImpl {
TaskImpl task = (TaskImpl) t;
task.handle(new TaskEvent(task.getID(), TaskEventType.T_SCHEDULE));
for(TaskAttempt ta: task.getAttempts().values()) {
task.handle(new TaskTAttemptEvent(ta.getID(),
TaskEventType.T_ATTEMPT_FAILED));
task.handle(new TaskTAttemptFailedEvent(ta.getID()));
}
}

View File

@ -39,6 +39,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
@ -499,7 +500,7 @@ public class TestTaskAttempt{
new TaskAttemptDiagnosticsUpdateEvent(attemptID,
"Test Diagnostic Event"));
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
new TaskAttemptFailEvent(attemptID));
}
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
@ -1357,8 +1358,7 @@ public class TestTaskAttempt{
MockEventHandler eventHandler = new MockEventHandler();
TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
TaskAttemptEventType.TA_FAILMSG));
taImpl.handle(new TaskAttemptFailEvent(taImpl.getID()));
assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
TaskAttemptState.FAILED);
@ -1484,8 +1484,7 @@ public class TestTaskAttempt{
MockEventHandler eventHandler = new MockEventHandler();
TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
TaskAttemptEventType.TA_FAILMSG));
taImpl.handle(new TaskAttemptFailEvent(taImpl.getID()));
assertEquals("Task attempt is not in RUNNING state", taImpl.getState(),
TaskAttemptState.FAILED);

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.security.Credentials;
@ -345,8 +346,7 @@ public class TestTaskImpl {
}
private void failRunningTaskAttempt(TaskAttemptId attemptId) {
mockTask.handle(new TaskTAttemptEvent(attemptId,
TaskEventType.T_ATTEMPT_FAILED));
mockTask.handle(new TaskTAttemptFailedEvent(attemptId));
assertTaskRunningState();
}
@ -612,11 +612,16 @@ public class TestTaskImpl {
// The task should now have succeeded
assertTaskSucceededState();
// Now complete the first task attempt, after the second has succeeded
mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(0).getAttemptId(),
firstAttemptFinishEvent));
if (firstAttemptFinishEvent.equals(TaskEventType.T_ATTEMPT_FAILED)) {
mockTask.handle(new TaskTAttemptFailedEvent(taskAttempts
.get(0).getAttemptId()));
} else {
mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(0).getAttemptId(),
firstAttemptFinishEvent));
}
// The task should still be in the succeeded state
assertTaskSucceededState();
@ -668,8 +673,8 @@ public class TestTaskImpl {
assertEquals(2, taskAttempts.size());
// speculative attempt retroactively fails from fetch failures
mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(),
TaskEventType.T_ATTEMPT_FAILED));
mockTask.handle(new TaskTAttemptFailedEvent(
taskAttempts.get(1).getAttemptId()));
assertTaskScheduledState();
assertEquals(3, taskAttempts.size());
@ -683,8 +688,8 @@ public class TestTaskImpl {
assertEquals(2, taskAttempts.size());
// speculative attempt retroactively fails from fetch failures
mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(),
TaskEventType.T_ATTEMPT_FAILED));
mockTask.handle(new TaskTAttemptFailedEvent(
taskAttempts.get(1).getAttemptId()));
assertTaskScheduledState();
assertEquals(3, taskAttempts.size());
@ -698,8 +703,8 @@ public class TestTaskImpl {
assertEquals(2, taskAttempts.size());
// speculative attempt retroactively fails from fetch failures
mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(),
TaskEventType.T_ATTEMPT_FAILED));
mockTask.handle(new TaskTAttemptFailedEvent(
taskAttempts.get(1).getAttemptId()));
assertTaskScheduledState();
assertEquals(3, taskAttempts.size());
@ -734,8 +739,8 @@ public class TestTaskImpl {
// have the first attempt fail, verify task failed due to no retries
MockTaskAttemptImpl taskAttempt = taskAttempts.get(0);
taskAttempt.setState(TaskAttemptState.FAILED);
mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
TaskEventType.T_ATTEMPT_FAILED));
mockTask.handle(new TaskTAttemptFailedEvent(
taskAttempt.getAttemptId()));
assertEquals(TaskState.FAILED, mockTask.getState());
// verify task can no longer be killed
@ -757,8 +762,7 @@ public class TestTaskImpl {
TaskEventType.T_ATTEMPT_COMMIT_PENDING));
assertEquals(TaskState.FAILED, mockTask.getState());
taskAttempt.setState(TaskAttemptState.FAILED);
mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
TaskEventType.T_ATTEMPT_FAILED));
mockTask.handle(new TaskTAttemptFailedEvent(taskAttempt.getAttemptId()));
assertEquals(TaskState.FAILED, mockTask.getState());
taskAttempt = taskAttempts.get(2);
taskAttempt.setState(TaskAttemptState.SUCCEEDED);
@ -808,8 +812,7 @@ public class TestTaskImpl {
// max attempts is 4
MockTaskAttemptImpl taskAttempt = taskAttempts.get(0);
taskAttempt.setState(TaskAttemptState.FAILED);
mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
TaskEventType.T_ATTEMPT_FAILED));
mockTask.handle(new TaskTAttemptFailedEvent(taskAttempt.getAttemptId()));
assertEquals(TaskState.RUNNING, mockTask.getState());
// verify a new attempt(#3) added because the speculative attempt(#2)
@ -829,8 +832,7 @@ public class TestTaskImpl {
// hasn't reach the max attempts which is 4
MockTaskAttemptImpl taskAttempt1 = taskAttempts.get(1);
taskAttempt1.setState(TaskAttemptState.FAILED);
mockTask.handle(new TaskTAttemptEvent(taskAttempt1.getAttemptId(),
TaskEventType.T_ATTEMPT_FAILED));
mockTask.handle(new TaskTAttemptFailedEvent(taskAttempt1.getAttemptId()));
assertEquals(TaskState.RUNNING, mockTask.getState());
// verify there's no new attempt added because of the running attempt(#3)

View File

@ -729,9 +729,9 @@ public class LocalJobRunner implements ClientProtocol {
LOG.error("shuffleError: "+ message + "from task: " + taskId);
}
public synchronized void fatalError(TaskAttemptID taskId, String msg)
public synchronized void fatalError(TaskAttemptID taskId, String msg, boolean fastFail)
throws IOException {
LOG.error("Fatal: "+ msg + "from task: " + taskId);
LOG.error("Fatal: "+ msg + " from task: " + taskId + " fast fail: " + fastFail);
}
@Override

View File

@ -1568,7 +1568,8 @@ public class MapTask extends Task {
if (lspillException instanceof Error) {
final String logMsg = "Task " + getTaskID() + " failed : " +
StringUtils.stringifyException(lspillException);
mapTask.reportFatalError(getTaskID(), lspillException, logMsg);
mapTask.reportFatalError(getTaskID(), lspillException, logMsg,
false);
}
throw new IOException("Spill failed", lspillException);
}

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
@ -354,7 +355,7 @@ abstract public class Task implements Writable, Configurable {
* Report a fatal error to the parent (task) tracker.
*/
protected void reportFatalError(TaskAttemptID id, Throwable throwable,
String logMsg) {
String logMsg, boolean fastFail) {
LOG.error(logMsg);
if (ShutdownHookManager.get().isShutdownInProgress()) {
@ -366,7 +367,7 @@ abstract public class Task implements Writable, Configurable {
? StringUtils.stringifyException(throwable)
: StringUtils.stringifyException(tCause);
try {
umbilical.fatalError(id, cause);
umbilical.fatalError(id, cause, fastFail);
} catch (IOException ioe) {
LOG.error("Failed to contact the tasktracker", ioe);
System.exit(-1);
@ -652,6 +653,8 @@ abstract public class Task implements Writable, Configurable {
private Thread pingThread = null;
private boolean done = true;
private Object lock = new Object();
private volatile String diskLimitCheckStatus = null;
private Thread diskLimitCheckThread = null;
/**
* flag that indicates whether progress update needs to be sent to parent.
@ -748,6 +751,65 @@ abstract public class Task implements Writable, Configurable {
}
}
/**
* disk limit checker, runs in separate thread when activated.
*/
public class DiskLimitCheck implements Runnable {
private LocalFileSystem localFS;
private long fsLimit;
private long checkInterval;
private String[] localDirs;
private boolean killOnLimitExceeded;
public DiskLimitCheck(JobConf conf) throws IOException {
this.localFS = FileSystem.getLocal(conf);
this.fsLimit = conf.getLong(MRJobConfig.JOB_SINGLE_DISK_LIMIT_BYTES,
MRJobConfig.DEFAULT_JOB_SINGLE_DISK_LIMIT_BYTES);
this.localDirs = conf.getLocalDirs();
this.checkInterval = conf.getLong(
MRJobConfig.JOB_SINGLE_DISK_LIMIT_CHECK_INTERVAL_MS,
MRJobConfig.DEFAULT_JOB_SINGLE_DISK_LIMIT_CHECK_INTERVAL_MS);
this.killOnLimitExceeded = conf.getBoolean(
MRJobConfig.JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED,
MRJobConfig.DEFAULT_JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED);
}
@Override
public void run() {
while (!taskDone.get()) {
try {
long localWritesSize = 0L;
String largestWorkDir = null;
for (String local : localDirs) {
long size = FileUtil.getDU(localFS.pathToFile(new Path(local)));
if (localWritesSize < size) {
localWritesSize = size;
largestWorkDir = local;
}
}
if (localWritesSize > fsLimit) {
String localStatus =
"too much data in local scratch dir="
+ largestWorkDir
+ ". current size is "
+ localWritesSize
+ " the limit is " + fsLimit;
if (killOnLimitExceeded) {
LOG.error(localStatus);
diskLimitCheckStatus = localStatus;
} else {
LOG.warn(localStatus);
}
break;
}
Thread.sleep(checkInterval);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}
}
}
/**
* check the counters to see whether the task has exceeded any configured
* limits.
@ -773,6 +835,9 @@ abstract public class Task implements Writable, Configurable {
" the limit is " + limit);
}
}
if (diskLimitCheckStatus != null) {
throw new TaskLimitException(diskLimitCheckStatus);
}
}
/**
@ -851,7 +916,7 @@ abstract public class Task implements Writable, Configurable {
StringUtils.stringifyException(e);
LOG.error(errMsg);
try {
umbilical.fatalError(taskId, errMsg);
umbilical.fatalError(taskId, errMsg, true);
} catch (IOException ioe) {
LOG.error("Failed to update failure diagnosis", ioe);
}
@ -884,6 +949,22 @@ abstract public class Task implements Writable, Configurable {
pingThread.setDaemon(true);
pingThread.start();
}
startDiskLimitCheckerThreadIfNeeded();
}
public void startDiskLimitCheckerThreadIfNeeded() {
if (diskLimitCheckThread == null && conf.getLong(
MRJobConfig.JOB_SINGLE_DISK_LIMIT_BYTES,
MRJobConfig.DEFAULT_JOB_SINGLE_DISK_LIMIT_BYTES) >= 0) {
try {
diskLimitCheckThread = new Thread(new DiskLimitCheck(conf),
"disk limit check thread");
diskLimitCheckThread.setDaemon(true);
diskLimitCheckThread.start();
} catch (IOException e) {
LOG.error("Issues starting disk monitor thread: "
+ e.getMessage(), e);
}
}
}
public void stopCommunicationThread() throws InterruptedException {
if (pingThread != null) {

View File

@ -68,9 +68,10 @@ public interface TaskUmbilicalProtocol extends VersionedProtocol {
* Version 18 Added numRequiredSlots to TaskStatus for MAPREDUCE-516
* Version 19 Added fatalError for child to communicate fatal errors to TT
* Version 20 Added methods to manage checkpoints
* Version 21 Added fastFail parameter to fatalError
* */
public static final long versionID = 20L;
public static final long versionID = 21L;
/**
* Called when a child task process starts, to get its task.
@ -140,8 +141,13 @@ public interface TaskUmbilicalProtocol extends VersionedProtocol {
/** Report that the task encounted a local filesystem error.*/
void fsError(TaskAttemptID taskId, String message) throws IOException;
/** Report that the task encounted a fatal error.*/
void fatalError(TaskAttemptID taskId, String message) throws IOException;
/**
* Report that the task encounted a fatal error.
* @param taskId task's id
* @param message fail message
* @param fastFail flag to enable fast fail for task
*/
void fatalError(TaskAttemptID taskId, String message, boolean fastFail) throws IOException;
/** Called by a reduce task to get the map output locations for finished maps.
* Returns an update centered around the map-task-completion-events.

View File

@ -52,6 +52,20 @@ public interface MRJobConfig {
public static final String TASK_CLEANUP_NEEDED = "mapreduce.job.committer.task.cleanup.needed";
public static final String JOB_SINGLE_DISK_LIMIT_BYTES =
"mapreduce.job.local-fs.single-disk-limit.bytes";
// negative values disable the limit
public static final long DEFAULT_JOB_SINGLE_DISK_LIMIT_BYTES = -1;
public static final String JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED =
"mapreduce.job.local-fs.single-disk-limit.check.kill-limit-exceed";
// setting to false only logs the kill
public static final boolean DEFAULT_JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED = true;
public static final String JOB_SINGLE_DISK_LIMIT_CHECK_INTERVAL_MS =
"mapreduce.job.local-fs.single-disk-limit.check.interval-ms";
public static final long DEFAULT_JOB_SINGLE_DISK_LIMIT_CHECK_INTERVAL_MS = 5000;
public static final String TASK_LOCAL_WRITE_LIMIT_BYTES =
"mapreduce.task.local-fs.write-limit.bytes";
// negative values disable the limit

View File

@ -62,6 +62,28 @@
set to less than .5</description>
</property>
<property>
<name>mapreduce.job.local-fs.single-disk-limit.bytes</name>
<value>-1</value>
<description>Enable an in task monitor thread to watch for single disk
consumption by jobs. By setting this to x nr of bytes, the task will fast
fail in case it is reached. This is a per disk configuration.</description>
</property>
<property>
<name>mapreduce.job.local-fs.single-disk-limit.check.interval-ms</name>
<value>5000</value>
<description>Interval of disk limit check to run in ms.</description>
</property>
<property>
<name>mapreduce.job.local-fs.single-disk-limit.check.kill-limit-exceed</name>
<value>true</value>
<description>If mapreduce.job.local-fs.single-disk-limit.bytes is triggered
should the task be killed or logged. If false the intent to kill the task
is only logged in the container logs.</description>
</property>
<property>
<name>mapreduce.job.maps</name>
<value>2</value>

View File

@ -18,15 +18,19 @@
package org.apache.hadoop.mapred;
import java.io.File;
import java.io.IOException;
import java.util.Random;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.mapred.SortedRanges.Range;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.util.ExitUtil;
@ -43,6 +47,11 @@ public class TestTaskProgressReporter {
private FakeUmbilical fakeUmbilical = new FakeUmbilical();
private static final String TEST_DIR =
System.getProperty("test.build.data",
System.getProperty("java.io.tmpdir")) + "/" +
TestTaskProgressReporter.class.getName();
private static class DummyTask extends Task {
@Override
public void run(JobConf job, TaskUmbilicalProtocol umbilical)
@ -53,6 +62,11 @@ public class TestTaskProgressReporter {
public boolean isMapTask() {
return true;
}
@Override
public boolean isCommitRequired() {
return false;
}
}
private static class FakeUmbilical implements TaskUmbilicalProtocol {
@ -118,7 +132,7 @@ public class TestTaskProgressReporter {
}
@Override
public void fatalError(TaskAttemptID taskId, String message)
public void fatalError(TaskAttemptID taskId, String message, boolean fastFail)
throws IOException {
}
@ -163,6 +177,78 @@ public class TestTaskProgressReporter {
}
}
@Test(timeout=60000)
public void testScratchDirSize() throws Exception {
String tmpPath = TEST_DIR + "/testBytesWrittenLimit-tmpFile-"
+ new Random(System.currentTimeMillis()).nextInt();
File data = new File(tmpPath + "/out");
File testDir = new File(tmpPath);
testDir.mkdirs();
testDir.deleteOnExit();
JobConf conf = new JobConf();
conf.setStrings(MRConfig.LOCAL_DIR, "file://" + tmpPath);
conf.setLong(MRJobConfig.JOB_SINGLE_DISK_LIMIT_BYTES, 1024L);
conf.setBoolean(MRJobConfig.JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED,
true);
getBaseConfAndWriteToFile(-1, data);
testScratchDirLimit(false, conf);
data.delete();
getBaseConfAndWriteToFile(100, data);
testScratchDirLimit(false, conf);
data.delete();
getBaseConfAndWriteToFile(1536, data);
testScratchDirLimit(true, conf);
conf.setBoolean(MRJobConfig.JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED,
false);
testScratchDirLimit(false, conf);
conf.setBoolean(MRJobConfig.JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED,
true);
conf.setLong(MRJobConfig.JOB_SINGLE_DISK_LIMIT_BYTES, -1L);
testScratchDirLimit(false, conf);
data.delete();
FileUtil.fullyDelete(testDir);
}
private void getBaseConfAndWriteToFile(int size, File data)
throws IOException {
if (size > 0) {
byte[] b = new byte[size];
for (int i = 0; i < size; i++) {
b[i] = 1;
}
FileUtils.writeByteArrayToFile(data, b);
}
}
public void testScratchDirLimit(boolean fastFail, JobConf conf)
throws Exception {
ExitUtil.disableSystemExit();
threadExited = false;
Thread.UncaughtExceptionHandler h = new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread th, Throwable ex) {
if (ex instanceof ExitUtil.ExitException) {
threadExited = true;
th.interrupt();
}
}
};
Task task = new DummyTask();
task.setConf(conf);
DummyTaskReporter reporter = new DummyTaskReporter(task);
reporter.startDiskLimitCheckerThreadIfNeeded();
Thread t = new Thread(reporter);
t.setUncaughtExceptionHandler(h);
reporter.setProgressFlag();
t.start();
while (!reporter.taskLimitIsChecked) {
Thread.yield();
}
task.done(fakeUmbilical, reporter);
reporter.resetDoneFlag();
t.join(1000L);
Assert.assertEquals(fastFail, threadExited);
}
@Test (timeout=10000)
public void testTaskProgress() throws Exception {
JobConf job = new JobConf();
@ -214,7 +300,7 @@ public class TestTaskProgressReporter {
conf.getLong(MRJobConfig.TASK_PROGRESS_REPORT_INTERVAL, 0);
conf.setLong(MRJobConfig.TASK_LOCAL_WRITE_LIMIT_BYTES, limit);
LocalFileSystem localFS = FileSystem.getLocal(conf);
Path tmpPath = new Path("/tmp/testBytesWrittenLimit-tmpFile-"
Path tmpPath = new Path(TEST_DIR + "/testBytesWrittenLimit-tmpFile-"
+ new Random(System.currentTimeMillis()).nextInt());
FSDataOutputStream out = localFS.create(tmpPath, true);
out.write(new byte[LOCAL_BYTES_WRITTEN]);

View File

@ -36,6 +36,7 @@ import java.util.Map;
import java.util.StringTokenizer;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
@ -712,7 +713,7 @@ public class TestJobHistoryParsing {
protected void attemptLaunched(TaskAttemptId attemptID) {
if (attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0) {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
new TaskAttemptFailEvent(attemptID));
} else {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE));
@ -732,7 +733,7 @@ public class TestJobHistoryParsing {
protected void attemptLaunched(TaskAttemptId attemptID) {
if (attemptID.getTaskId().getId() == 0) {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
new TaskAttemptFailEvent(attemptID));
} else {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE));
@ -760,10 +761,10 @@ public class TestJobHistoryParsing {
new TaskEvent(attemptID.getTaskId(), TaskEventType.T_KILL));
} else if (taskType == TaskType.MAP && taskId == 1) {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
new TaskAttemptFailEvent(attemptID));
} else if (taskType == TaskType.REDUCE && taskId == 0) {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
new TaskAttemptFailEvent(attemptID));
} else if (taskType == TaskType.REDUCE && taskId == 1) {
getContext().getEventHandler().handle(
new TaskEvent(attemptID.getTaskId(), TaskEventType.T_KILL));

View File

@ -91,8 +91,8 @@ public class TestMapProgress {
LOG.info("Task " + taskId + " reporting shuffle error: " + message);
}
public void fatalError(TaskAttemptID taskId, String msg) throws IOException {
LOG.info("Task " + taskId + " reporting fatal error: " + msg);
public void fatalError(TaskAttemptID taskId, String msg, boolean fastFail) throws IOException {
LOG.info("Task " + taskId + " reporting fatal error: " + msg + " fast fail: " + fastFail);
}
public JvmTask getTask(JvmContext context) throws IOException {

View File

@ -124,7 +124,7 @@ public class TestTaskCommit extends HadoopTestCase {
}
@Override
public void fatalError(TaskAttemptID taskId, String message)
public void fatalError(TaskAttemptID taskId, String message, boolean fastFail)
throws IOException { }
@Override