MAPREDUCE-5956. Made MR AM not use maxAttempts to determine if the current attempt is the last retry. Contributed by Wangda Tan.

svn merge --ignore-ancestry -c 1609649 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1609650 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhijie Shen 2014-07-11 08:49:54 +00:00
parent e4f494a767
commit bba266610c
7 changed files with 59 additions and 61 deletions

View File

@ -15,6 +15,9 @@ Release 2.6.0 - UNRELEASED
MAPREDUCE-5866. TestFixedLengthInputFormat fails in windows.
(Varun Vasudev via cnauroth)
MAPREDUCE-5956. Made MR AM not use maxAttempts to determine if the current
attempt is the last retry. (Wangda Tan via zjshen)
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -184,7 +184,6 @@ public class MRAppMaster extends CompositeService {
private final int nmPort;
private final int nmHttpPort;
protected final MRAppMetrics metrics;
private final int maxAppAttempts;
private Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
private List<AMInfo> amInfos;
private AppContext context;
@ -224,14 +223,14 @@ public class MRAppMaster extends CompositeService {
public MRAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
long appSubmitTime, int maxAppAttempts) {
long appSubmitTime) {
this(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort,
new SystemClock(), appSubmitTime, maxAppAttempts);
new SystemClock(), appSubmitTime);
}
public MRAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
Clock clock, long appSubmitTime, int maxAppAttempts) {
Clock clock, long appSubmitTime) {
super(MRAppMaster.class.getName());
this.clock = clock;
this.startTime = clock.getTime();
@ -242,7 +241,6 @@ public MRAppMaster(ApplicationAttemptId applicationAttemptId,
this.nmPort = nmPort;
this.nmHttpPort = nmHttpPort;
this.metrics = MRAppMetrics.create();
this.maxAppAttempts = maxAppAttempts;
logSyncer = TaskLog.createLogSyncer();
LOG.info("Created MRAppMaster for application " + applicationAttemptId);
}
@ -255,12 +253,6 @@ protected void serviceInit(final Configuration conf) throws Exception {
context = new RunningAppContext(conf);
((RunningAppContext)context).computeIsLastAMRetry();
LOG.info("The specific max attempts: " + maxAppAttempts +
" for application: " + appAttemptID.getApplicationId().getId() +
". Attempt num: " + appAttemptID.getAttemptId() +
" is last retry: " + isLastAMRetry);
// Job name is the same as the app name util we support DAG of jobs
// for an app later
appName = conf.get(MRJobConfig.JOB_NAME, "<missing app name>");
@ -993,8 +985,8 @@ public void markSuccessfulUnregistration() {
successfullyUnregistered.set(true);
}
public void computeIsLastAMRetry() {
isLastAMRetry = appAttemptID.getAttemptId() >= maxAppAttempts;
public void resetIsLastAMRetry() {
isLastAMRetry = false;
}
}
@ -1374,8 +1366,6 @@ public static void main(String[] args) {
System.getenv(Environment.NM_HTTP_PORT.name());
String appSubmitTimeStr =
System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
String maxAppAttempts =
System.getenv(ApplicationConstants.MAX_APP_ATTEMPTS_ENV);
validateInputParam(containerIdStr,
Environment.CONTAINER_ID.name());
@ -1385,8 +1375,6 @@ public static void main(String[] args) {
Environment.NM_HTTP_PORT.name());
validateInputParam(appSubmitTimeStr,
ApplicationConstants.APP_SUBMIT_TIME_ENV);
validateInputParam(maxAppAttempts,
ApplicationConstants.MAX_APP_ATTEMPTS_ENV);
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
ApplicationAttemptId applicationAttemptId =
@ -1397,8 +1385,7 @@ public static void main(String[] args) {
MRAppMaster appMaster =
new MRAppMaster(applicationAttemptId, containerId, nodeHostString,
Integer.parseInt(nodePortString),
Integer.parseInt(nodeHttpPortString), appSubmitTime,
Integer.parseInt(maxAppAttempts));
Integer.parseInt(nodeHttpPortString), appSubmitTime);
ShutdownHookManager.get().addShutdownHook(
new MRAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
JobConf conf = new JobConf(new YarnConfiguration());

View File

@ -185,7 +185,7 @@ protected void unregister() {
// if unregistration failed, isLastAMRetry needs to be recalculated
// to see whether AM really has the chance to retry
RunningAppContext raContext = (RunningAppContext) context;
raContext.computeIsLastAMRetry();
raContext.resetIsLastAMRetry();
}
}

View File

@ -227,8 +227,8 @@ public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount, Clock clock, boolean unregistered,
String assignedQueue) {
super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock, System
.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock,
System.currentTimeMillis());
this.testWorkDir = new File("target", testName);
testAbsPath = new Path(testWorkDir.getAbsolutePath());
LOG.info("PathUsed: " + testAbsPath);

View File

@ -253,6 +253,12 @@ public void testNotificationOnLastRetryUnregistrationFailure()
HttpServer2 server = startHttpServer();
MRApp app = spy(new MRAppWithCustomContainerAllocator(2, 2, false,
this.getClass().getName(), true, 2, false));
// Currently, we will have isLastRetry always equals to false at beginning
// of MRAppMaster, except staging area exists or commit already started at
// the beginning.
// Now manually set isLastRetry to true and this should reset to false when
// unregister failed.
app.isLastAMRetry = true;
doNothing().when(app).sysexit();
JobConf conf = new JobConf();
conf.set(JobContext.MR_JOB_END_NOTIFICATION_URL,
@ -265,12 +271,11 @@ public void testNotificationOnLastRetryUnregistrationFailure()
// Now shutdown. User should see FAILED state.
// Unregistration fails: isLastAMRetry is recalculated, this is
app.shutDownJob();
Assert.assertTrue(app.isLastAMRetry());
Assert.assertEquals(1, JobEndServlet.calledTimes);
Assert.assertEquals("jobid=" + job.getID() + "&status=FAILED",
JobEndServlet.requestUri.getQuery());
Assert.assertEquals(JobState.FAILED.toString(),
JobEndServlet.foundJobState);
Assert.assertFalse(app.isLastAMRetry());
// Since it's not last retry, JobEndServlet didn't called
Assert.assertEquals(0, JobEndServlet.calledTimes);
Assert.assertNull(JobEndServlet.requestUri);
Assert.assertNull(JobEndServlet.foundJobState);
server.stop();
}

View File

@ -118,7 +118,7 @@ public void testMRAppMasterForDifferentUser() throws IOException,
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
MRAppMasterTest appMaster =
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
System.currentTimeMillis());
JobConf conf = new JobConf();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@ -147,8 +147,7 @@ public void testMRAppMasterMidLock() throws IOException,
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
MRAppMaster appMaster =
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS,
false, false);
System.currentTimeMillis(), false, false);
boolean caught = false;
try {
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@ -186,8 +185,7 @@ public void testMRAppMasterSuccessLock() throws IOException,
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
MRAppMaster appMaster =
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS,
false, false);
System.currentTimeMillis(), false, false);
boolean caught = false;
try {
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@ -225,8 +223,7 @@ public void testMRAppMasterFailLock() throws IOException,
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
MRAppMaster appMaster =
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS,
false, false);
System.currentTimeMillis(), false, false);
boolean caught = false;
try {
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@ -264,8 +261,7 @@ public void testMRAppMasterMissingStaging() throws IOException,
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
MRAppMaster appMaster =
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS,
false, false);
System.currentTimeMillis(), false, false);
boolean caught = false;
try {
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@ -285,8 +281,9 @@ public void testMRAppMasterMissingStaging() throws IOException,
@Test (timeout = 30000)
public void testMRAppMasterMaxAppAttempts() throws IOException,
InterruptedException {
int[] maxAppAttemtps = new int[] { 1, 2, 3 };
Boolean[] expectedBools = new Boolean[]{ true, true, false };
// No matter what's the maxAppAttempt or attempt id, the isLastRetry always
// equals to false
Boolean[] expectedBools = new Boolean[]{ false, false, false };
String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
String containerIdStr = "container_1317529182569_0004_000002_1";
@ -301,10 +298,10 @@ public void testMRAppMasterMaxAppAttempts() throws IOException,
File stagingDir =
new File(MRApps.getStagingAreaDir(conf, userName).toString());
stagingDir.mkdirs();
for (int i = 0; i < maxAppAttemtps.length; ++i) {
for (int i = 0; i < expectedBools.length; ++i) {
MRAppMasterTest appMaster =
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
System.currentTimeMillis(), maxAppAttemtps[i], false, true);
System.currentTimeMillis(), false, true);
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
assertEquals("isLastAMRetry is correctly computed.", expectedBools[i],
appMaster.isLastAMRetry());
@ -399,7 +396,7 @@ public void testMRAppMasterCredentials() throws Exception {
MRAppMasterTest appMaster =
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
System.currentTimeMillis(), 1, false, true);
System.currentTimeMillis(), false, true);
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
// Now validate the task credentials
@ -466,16 +463,15 @@ class MRAppMasterTest extends MRAppMaster {
public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String host, int port, int httpPort,
long submitTime, int maxAppAttempts) {
long submitTime) {
this(applicationAttemptId, containerId, host, port, httpPort,
submitTime, maxAppAttempts, true, true);
submitTime, true, true);
}
public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String host, int port, int httpPort,
long submitTime, int maxAppAttempts, boolean overrideInit,
long submitTime, boolean overrideInit,
boolean overrideStart) {
super(applicationAttemptId, containerId, host, port, httpPort, submitTime,
maxAppAttempts);
super(applicationAttemptId, containerId, host, port, httpPort, submitTime);
this.overrideInit = overrideInit;
this.overrideStart = overrideStart;
mockContainerAllocator = mock(ContainerAllocator.class);

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce.v2.app;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.mock;
@ -28,9 +29,6 @@
import java.io.IOException;
import org.junit.Assert;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -62,13 +60,14 @@
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.junit.Assert;
import org.junit.Test;
/**
* Make sure that the job staging directory clean up happens.
*/
public class TestStagingCleanup extends TestCase {
public class TestStagingCleanup {
private Configuration conf = new Configuration();
private FileSystem fs;
@ -81,7 +80,7 @@ public class TestStagingCleanup extends TestCase {
public void testDeletionofStagingOnUnregistrationFailure()
throws IOException {
testDeletionofStagingOnUnregistrationFailure(2, false);
testDeletionofStagingOnUnregistrationFailure(1, true);
testDeletionofStagingOnUnregistrationFailure(1, false);
}
@SuppressWarnings("resource")
@ -104,7 +103,7 @@ private void testDeletionofStagingOnUnregistrationFailure(
appMaster.init(conf);
appMaster.start();
appMaster.shutDownJob();
((RunningAppContext) appMaster.getContext()).computeIsLastAMRetry();
((RunningAppContext) appMaster.getContext()).resetIsLastAMRetry();
if (shouldHaveDeleted) {
Assert.assertEquals(new Boolean(true), appMaster.isLastAMRetry());
verify(fs).delete(stagingJobPath, true);
@ -164,7 +163,11 @@ public void testNoDeletionofStagingOnReboot() throws IOException {
verify(fs, times(0)).delete(stagingJobPath, true);
}
@Test (timeout = 30000)
// FIXME:
// Disabled this test because currently, when job state=REBOOT at shutdown
// when lastRetry = true in RM view, cleanup will not do.
// This will be supported after YARN-2261 completed
// @Test (timeout = 30000)
public void testDeletionofStagingOnReboot() throws IOException {
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
fs = mock(FileSystem.class);
@ -202,7 +205,7 @@ public void testDeletionofStagingOnKill() throws IOException {
JobId jobid = recordFactory.newRecordInstance(JobId.class);
jobid.setAppId(appId);
ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, 4);
MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc);
appMaster.init(conf);
//simulate the process being killed
MRAppMaster.MRAppMasterShutdownHook hook =
@ -211,7 +214,11 @@ public void testDeletionofStagingOnKill() throws IOException {
verify(fs, times(0)).delete(stagingJobPath, true);
}
@Test (timeout = 30000)
// FIXME:
// Disabled this test because currently, when shutdown hook triggered at
// lastRetry in RM view, cleanup will not do. This should be supported after
// YARN-2261 completed
// @Test (timeout = 30000)
public void testDeletionofStagingOnKillLastTry() throws IOException {
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
fs = mock(FileSystem.class);
@ -226,7 +233,7 @@ public void testDeletionofStagingOnKillLastTry() throws IOException {
JobId jobid = recordFactory.newRecordInstance(JobId.class);
jobid.setAppId(appId);
ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, 1); //no retry
MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc); //no retry
appMaster.init(conf);
assertTrue("appMaster.isLastAMRetry() is false", appMaster.isLastAMRetry());
//simulate the process being killed
@ -245,10 +252,10 @@ private class TestMRApp extends MRAppMaster {
boolean crushUnregistration = false;
public TestMRApp(ApplicationAttemptId applicationAttemptId,
ContainerAllocator allocator, int maxAppAttempts) {
ContainerAllocator allocator) {
super(applicationAttemptId, ContainerId.newInstance(
applicationAttemptId, 1), "testhost", 2222, 3333,
System.currentTimeMillis(), maxAppAttempts);
System.currentTimeMillis());
this.allocator = allocator;
this.successfullyUnregistered.set(true);
}
@ -256,7 +263,7 @@ public TestMRApp(ApplicationAttemptId applicationAttemptId,
public TestMRApp(ApplicationAttemptId applicationAttemptId,
ContainerAllocator allocator, JobStateInternal jobStateInternal,
int maxAppAttempts) {
this(applicationAttemptId, allocator, maxAppAttempts);
this(applicationAttemptId, allocator);
this.jobStateInternal = jobStateInternal;
}