MAPREDUCE-3058. Fixed MR YarnChild to report failure when task throws an error and thus prevent a hanging task and job. (vinodkv)

svn merge -c r1187654 --ignore-ancestry ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1187655 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-10-22 06:17:25 +00:00
parent abae89ed32
commit 1605fb7dbc
5 changed files with 22 additions and 2 deletions

View File

@ -1673,6 +1673,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3242. Trunk compilation broken with bad interaction from MAPREDUCE-3242. Trunk compilation broken with bad interaction from
MAPREDUCE-3070 and MAPREDUCE-3239. (mahadev) MAPREDUCE-3070 and MAPREDUCE-3239. (mahadev)
MAPREDUCE-3058. Fixed MR YarnChild to report failure when task throws an
error and thus prevent a hanging task and job. (vinodkv)
Release 0.22.0 - Unreleased Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -177,7 +177,7 @@ class YarnChild {
ByteArrayOutputStream baos = new ByteArrayOutputStream(); ByteArrayOutputStream baos = new ByteArrayOutputStream();
exception.printStackTrace(new PrintStream(baos)); exception.printStackTrace(new PrintStream(baos));
if (taskid != null) { if (taskid != null) {
umbilical.reportDiagnosticInfo(taskid, baos.toString()); umbilical.fatalError(taskid, baos.toString());
} }
} catch (Throwable throwable) { } catch (Throwable throwable) {
LOG.fatal("Error running child : " LOG.fatal("Error running child : "

View File

@ -30,6 +30,22 @@ import org.apache.hadoop.mapreduce.Mapper;
public class FailingMapper extends Mapper<Text, Text, Text, Text> { public class FailingMapper extends Mapper<Text, Text, Text, Text> {
public void map(Text key, Text value, public void map(Text key, Text value,
Context context) throws IOException,InterruptedException { Context context) throws IOException,InterruptedException {
// Just create a non-daemon thread which hangs forever. MR AM should not be
// hung by this.
new Thread() {
@Override
public void run() {
synchronized (this) {
try {
wait();
} catch (InterruptedException e) {
//
}
}
}
}.start();
if (context.getTaskAttemptID().getId() == 0) { if (context.getTaskAttemptID().getId() == 0) {
System.out.println("Attempt:" + context.getTaskAttemptID() + System.out.println("Attempt:" + context.getTaskAttemptID() +
" Failing mapper throwing exception"); " Failing mapper throwing exception");

View File

@ -299,7 +299,6 @@ public class TestMRJobs {
throws IOException, InterruptedException, ClassNotFoundException { throws IOException, InterruptedException, ClassNotFoundException {
Configuration myConf = new Configuration(mrCluster.getConfig()); Configuration myConf = new Configuration(mrCluster.getConfig());
myConf.setInt(MRJobConfig.NUM_MAPS, 1); myConf.setInt(MRJobConfig.NUM_MAPS, 1);
myConf.setInt("mapreduce.task.timeout", 10*1000);//reduce the timeout
myConf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2); //reduce the number of attempts myConf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2); //reduce the number of attempts
Job job = new Job(myConf); Job job = new Job(myConf);

View File

@ -275,9 +275,11 @@ class LoadJob extends GridmixJob {
matcher = new ResourceUsageMatcherRunner(ctxt, matcher = new ResourceUsageMatcherRunner(ctxt,
split.getMapResourceUsageMetrics()); split.getMapResourceUsageMetrics());
matcher.setDaemon(true);
// start the status reporter thread // start the status reporter thread
reporter = new StatusReporter(ctxt); reporter = new StatusReporter(ctxt);
reporter.setDaemon(true);
reporter.start(); reporter.start();
} }