svn merge -c 1401726 FIXES: YARN-139. Interrupted Exception within AsyncDispatcher leads to user confusion. Contributed by Vinod Kumar Vavilapalli

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1401729 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2012-10-24 15:25:22 +00:00
parent c4defd0cff
commit b702473f5c
4 changed files with 68 additions and 52 deletions

View File

@ -107,6 +107,8 @@ import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting;
/** /**
* The Map-Reduce Application Master. * The Map-Reduce Application Master.
* The state machine is encapsulated in the implementation of Job interface. * The state machine is encapsulated in the implementation of Job interface.
@ -398,52 +400,65 @@ public class MRAppMaster extends CompositeService {
protected void sysexit() { protected void sysexit() {
System.exit(0); System.exit(0);
} }
@VisibleForTesting
public void shutDownJob() {
// job has finished
// this is the only job, so shut down the Appmaster
// note in a workflow scenario, this may lead to creation of a new
// job (FIXME?)
// Send job-end notification
if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) {
try {
LOG.info("Job end notification started for jobID : "
+ job.getReport().getJobId());
JobEndNotifier notifier = new JobEndNotifier();
notifier.setConf(getConfig());
notifier.notify(job.getReport());
} catch (InterruptedException ie) {
LOG.warn("Job end notification interrupted for jobID : "
+ job.getReport().getJobId(), ie);
}
}
// TODO:currently just wait for some time so clients can know the
// final states. Will be removed once RM come on.
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
//We are finishing cleanly so this is the last retry
isLastAMRetry = true;
// Stop all services
// This will also send the final report to the ResourceManager
LOG.info("Calling stop for all the services");
MRAppMaster.this.stop();
} catch (Throwable t) {
LOG.warn("Graceful stop failed ", t);
}
//Bring the process down by force.
//Not needed after HADOOP-7140
LOG.info("Exiting MR AppMaster..GoodBye!");
sysexit();
}
private class JobFinishEventHandler implements EventHandler<JobFinishEvent> { private class JobFinishEventHandler implements EventHandler<JobFinishEvent> {
@Override @Override
public void handle(JobFinishEvent event) { public void handle(JobFinishEvent event) {
// job has finished // Create a new thread to shutdown the AM. We should not do it in-line
// this is the only job, so shut down the Appmaster // to avoid blocking the dispatcher itself.
// note in a workflow scenario, this may lead to creation of a new new Thread() {
// job (FIXME?)
// Send job-end notification @Override
if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) { public void run() {
try { shutDownJob();
LOG.info("Job end notification started for jobID : "
+ job.getReport().getJobId());
JobEndNotifier notifier = new JobEndNotifier();
notifier.setConf(getConfig());
notifier.notify(job.getReport());
} catch (InterruptedException ie) {
LOG.warn("Job end notification interrupted for jobID : "
+ job.getReport().getJobId(), ie);
} }
} }.start();
// TODO:currently just wait for some time so clients can know the
// final states. Will be removed once RM come on.
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
//We are finishing cleanly so this is the last retry
isLastAMRetry = true;
// Stop all services
// This will also send the final report to the ResourceManager
LOG.info("Calling stop for all the services");
stop();
} catch (Throwable t) {
LOG.warn("Graceful stop failed ", t);
}
//Bring the process down by force.
//Not needed after HADOOP-7140
LOG.info("Exiting MR AppMaster..GoodBye!");
sysexit();
} }
} }

View File

@ -21,17 +21,15 @@ package org.apache.hadoop.mapreduce.v2.app;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.mockito.Mockito.times;
import java.io.IOException; import java.io.IOException;
import junit.framework.Assert; import junit.framework.Assert;
import junit.framework.TestCase; import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -49,7 +47,6 @@ import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.AbstractService;
@ -68,7 +65,6 @@ import org.junit.Test;
private Path stagingJobPath = new Path(stagingJobDir); private Path stagingJobPath = new Path(stagingJobDir);
private final static RecordFactory recordFactory = RecordFactoryProvider. private final static RecordFactory recordFactory = RecordFactoryProvider.
getRecordFactory(null); getRecordFactory(null);
private static final Log LOG = LogFactory.getLog(TestStagingCleanup.class);
@Test @Test
public void testDeletionofStaging() throws IOException { public void testDeletionofStaging() throws IOException {
@ -86,9 +82,7 @@ import org.junit.Test;
jobid.setAppId(appId); jobid.setAppId(appId);
MRAppMaster appMaster = new TestMRApp(attemptId); MRAppMaster appMaster = new TestMRApp(attemptId);
appMaster.init(conf); appMaster.init(conf);
EventHandler<JobFinishEvent> handler = appMaster.shutDownJob();
appMaster.createJobFinishEventHandler();
handler.handle(new JobFinishEvent(jobid));
verify(fs).delete(stagingJobPath, true); verify(fs).delete(stagingJobPath, true);
} }

View File

@ -165,6 +165,9 @@ Release 0.23.5 - UNRELEASED
YARN-180. Capacity scheduler - containers that get reserved create YARN-180. Capacity scheduler - containers that get reserved create
container token to early (acmurthy and bobby) container token to early (acmurthy and bobby)
YARN-139. Interrupted Exception within AsyncDispatcher leads to user
confusion. (Vinod Kumar Vavilapalli via jlowe)
Release 0.23.4 - UNRELEASED Release 0.23.4 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -68,7 +68,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
try { try {
event = eventQueue.take(); event = eventQueue.take();
} catch(InterruptedException ie) { } catch(InterruptedException ie) {
LOG.warn("AsyncDispatcher thread interrupted", ie); if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", ie);
}
return; return;
} }
if (event != null) { if (event != null) {
@ -180,7 +182,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
try { try {
eventQueue.put(event); eventQueue.put(event);
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.warn("AsyncDispatcher thread interrupted", e); if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", e);
}
throw new YarnException(e); throw new YarnException(e);
} }
}; };