From bba266610cbdae66e0af8eb3ae4c8e308c0e2603 Mon Sep 17 00:00:00 2001 From: Zhijie Shen Date: Fri, 11 Jul 2014 08:49:54 +0000 Subject: [PATCH] MAPREDUCE-5956. Made MR AM not use maxAttempts to determine if the current attempt is the last retry. Contributed by Wangda Tan. svn merge --ignore-ancestry -c 1609649 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1609650 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../hadoop/mapreduce/v2/app/MRAppMaster.java | 25 ++++--------- .../mapreduce/v2/app/rm/RMCommunicator.java | 2 +- .../apache/hadoop/mapreduce/v2/app/MRApp.java | 4 +-- .../mapreduce/v2/app/TestJobEndNotifier.java | 17 +++++---- .../mapreduce/v2/app/TestMRAppMaster.java | 34 ++++++++---------- .../mapreduce/v2/app/TestStagingCleanup.java | 35 +++++++++++-------- 7 files changed, 59 insertions(+), 61 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index ff120d95ea5..aa59e5cd853 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -15,6 +15,9 @@ Release 2.6.0 - UNRELEASED MAPREDUCE-5866. TestFixedLengthInputFormat fails in windows. (Varun Vasudev via cnauroth) + MAPREDUCE-5956. Made MR AM not use maxAttempts to determine if the current + attempt is the last retry. (Wangda Tan via zjshen) + Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 491cd8009e6..b6090343a15 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -184,7 +184,6 @@ public class MRAppMaster extends CompositeService { private final int nmPort; private final int nmHttpPort; protected final MRAppMetrics metrics; - private final int maxAppAttempts; private Map completedTasksFromPreviousRun; private List amInfos; private AppContext context; @@ -224,14 +223,14 @@ public class MRAppMaster extends CompositeService { public MRAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId, String nmHost, int nmPort, int nmHttpPort, - long appSubmitTime, int maxAppAttempts) { + long appSubmitTime) { this(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, - new SystemClock(), appSubmitTime, maxAppAttempts); + new SystemClock(), appSubmitTime); } public MRAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId, String nmHost, int nmPort, int nmHttpPort, - Clock clock, long appSubmitTime, int maxAppAttempts) { + Clock clock, long appSubmitTime) { super(MRAppMaster.class.getName()); this.clock = clock; this.startTime = clock.getTime(); @@ -242,7 +241,6 @@ public class MRAppMaster extends CompositeService { this.nmPort = nmPort; this.nmHttpPort = nmHttpPort; this.metrics = MRAppMetrics.create(); - this.maxAppAttempts = maxAppAttempts; logSyncer = TaskLog.createLogSyncer(); LOG.info("Created MRAppMaster for application " + applicationAttemptId); } @@ -255,12 +253,6 @@ public class MRAppMaster extends CompositeService { context = new RunningAppContext(conf); - ((RunningAppContext)context).computeIsLastAMRetry(); - LOG.info("The specific max attempts: " + maxAppAttempts + - " for application: " + appAttemptID.getApplicationId().getId() + - ". Attempt num: " + appAttemptID.getAttemptId() + - " is last retry: " + isLastAMRetry); - // Job name is the same as the app name util we support DAG of jobs // for an app later appName = conf.get(MRJobConfig.JOB_NAME, ""); @@ -993,8 +985,8 @@ public class MRAppMaster extends CompositeService { successfullyUnregistered.set(true); } - public void computeIsLastAMRetry() { - isLastAMRetry = appAttemptID.getAttemptId() >= maxAppAttempts; + public void resetIsLastAMRetry() { + isLastAMRetry = false; } } @@ -1374,8 +1366,6 @@ public class MRAppMaster extends CompositeService { System.getenv(Environment.NM_HTTP_PORT.name()); String appSubmitTimeStr = System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV); - String maxAppAttempts = - System.getenv(ApplicationConstants.MAX_APP_ATTEMPTS_ENV); validateInputParam(containerIdStr, Environment.CONTAINER_ID.name()); @@ -1385,8 +1375,6 @@ public class MRAppMaster extends CompositeService { Environment.NM_HTTP_PORT.name()); validateInputParam(appSubmitTimeStr, ApplicationConstants.APP_SUBMIT_TIME_ENV); - validateInputParam(maxAppAttempts, - ApplicationConstants.MAX_APP_ATTEMPTS_ENV); ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); ApplicationAttemptId applicationAttemptId = @@ -1397,8 +1385,7 @@ public class MRAppMaster extends CompositeService { MRAppMaster appMaster = new MRAppMaster(applicationAttemptId, containerId, nodeHostString, Integer.parseInt(nodePortString), - Integer.parseInt(nodeHttpPortString), appSubmitTime, - Integer.parseInt(maxAppAttempts)); + Integer.parseInt(nodeHttpPortString), appSubmitTime); ShutdownHookManager.get().addShutdownHook( new MRAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY); JobConf conf = new JobConf(new YarnConfiguration()); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java index 8dac19b0d9c..e435009c6a7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java @@ -185,7 +185,7 @@ public abstract class RMCommunicator extends AbstractService // if unregistration failed, isLastAMRetry needs to be recalculated // to see whether AM really has the chance to retry RunningAppContext raContext = (RunningAppContext) context; - raContext.computeIsLastAMRetry(); + raContext.resetIsLastAMRetry(); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index 5c1f0cf0d97..9885582d88a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -227,8 +227,8 @@ public class MRApp extends MRAppMaster { int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart, int startCount, Clock clock, boolean unregistered, String assignedQueue) { - super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock, System - .currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS); + super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock, + System.currentTimeMillis()); this.testWorkDir = new File("target", testName); testAbsPath = new Path(testWorkDir.getAbsolutePath()); LOG.info("PathUsed: " + testAbsPath); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java index 8f9271f5c2f..e143b25695d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java @@ -253,6 +253,12 @@ public class TestJobEndNotifier extends JobEndNotifier { HttpServer2 server = startHttpServer(); MRApp app = spy(new MRAppWithCustomContainerAllocator(2, 2, false, this.getClass().getName(), true, 2, false)); + // Currently, we will have isLastRetry always equals to false at beginning + // of MRAppMaster, except staging area exists or commit already started at + // the beginning. + // Now manually set isLastRetry to true and this should reset to false when + // unregister failed. + app.isLastAMRetry = true; doNothing().when(app).sysexit(); JobConf conf = new JobConf(); conf.set(JobContext.MR_JOB_END_NOTIFICATION_URL, @@ -265,12 +271,11 @@ public class TestJobEndNotifier extends JobEndNotifier { // Now shutdown. User should see FAILED state. // Unregistration fails: isLastAMRetry is recalculated, this is app.shutDownJob(); - Assert.assertTrue(app.isLastAMRetry()); - Assert.assertEquals(1, JobEndServlet.calledTimes); - Assert.assertEquals("jobid=" + job.getID() + "&status=FAILED", - JobEndServlet.requestUri.getQuery()); - Assert.assertEquals(JobState.FAILED.toString(), - JobEndServlet.foundJobState); + Assert.assertFalse(app.isLastAMRetry()); + // Since it's not last retry, JobEndServlet didn't called + Assert.assertEquals(0, JobEndServlet.calledTimes); + Assert.assertNull(JobEndServlet.requestUri); + Assert.assertNull(JobEndServlet.foundJobState); server.stop(); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java index f4fa8573fa5..d356eca623d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java @@ -118,7 +118,7 @@ public class TestMRAppMaster { ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); MRAppMasterTest appMaster = new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1, - System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS); + System.currentTimeMillis()); JobConf conf = new JobConf(); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); @@ -147,8 +147,7 @@ public class TestMRAppMaster { ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); MRAppMaster appMaster = new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1, - System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS, - false, false); + System.currentTimeMillis(), false, false); boolean caught = false; try { MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); @@ -186,8 +185,7 @@ public class TestMRAppMaster { ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); MRAppMaster appMaster = new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1, - System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS, - false, false); + System.currentTimeMillis(), false, false); boolean caught = false; try { MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); @@ -225,8 +223,7 @@ public class TestMRAppMaster { ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); MRAppMaster appMaster = new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1, - System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS, - false, false); + System.currentTimeMillis(), false, false); boolean caught = false; try { MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); @@ -264,8 +261,7 @@ public class TestMRAppMaster { ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); MRAppMaster appMaster = new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1, - System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS, - false, false); + System.currentTimeMillis(), false, false); boolean caught = false; try { MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); @@ -285,8 +281,9 @@ public class TestMRAppMaster { @Test (timeout = 30000) public void testMRAppMasterMaxAppAttempts() throws IOException, InterruptedException { - int[] maxAppAttemtps = new int[] { 1, 2, 3 }; - Boolean[] expectedBools = new Boolean[]{ true, true, false }; + // No matter what's the maxAppAttempt or attempt id, the isLastRetry always + // equals to false + Boolean[] expectedBools = new Boolean[]{ false, false, false }; String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002"; String containerIdStr = "container_1317529182569_0004_000002_1"; @@ -301,10 +298,10 @@ public class TestMRAppMaster { File stagingDir = new File(MRApps.getStagingAreaDir(conf, userName).toString()); stagingDir.mkdirs(); - for (int i = 0; i < maxAppAttemtps.length; ++i) { + for (int i = 0; i < expectedBools.length; ++i) { MRAppMasterTest appMaster = new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1, - System.currentTimeMillis(), maxAppAttemtps[i], false, true); + System.currentTimeMillis(), false, true); MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); assertEquals("isLastAMRetry is correctly computed.", expectedBools[i], appMaster.isLastAMRetry()); @@ -399,7 +396,7 @@ public class TestMRAppMaster { MRAppMasterTest appMaster = new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1, - System.currentTimeMillis(), 1, false, true); + System.currentTimeMillis(), false, true); MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); // Now validate the task credentials @@ -466,16 +463,15 @@ class MRAppMasterTest extends MRAppMaster { public MRAppMasterTest(ApplicationAttemptId applicationAttemptId, ContainerId containerId, String host, int port, int httpPort, - long submitTime, int maxAppAttempts) { + long submitTime) { this(applicationAttemptId, containerId, host, port, httpPort, - submitTime, maxAppAttempts, true, true); + submitTime, true, true); } public MRAppMasterTest(ApplicationAttemptId applicationAttemptId, ContainerId containerId, String host, int port, int httpPort, - long submitTime, int maxAppAttempts, boolean overrideInit, + long submitTime, boolean overrideInit, boolean overrideStart) { - super(applicationAttemptId, containerId, host, port, httpPort, submitTime, - maxAppAttempts); + super(applicationAttemptId, containerId, host, port, httpPort, submitTime); this.overrideInit = overrideInit; this.overrideStart = overrideStart; mockContainerAllocator = mock(ContainerAllocator.class); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java index dcd670a7c61..1037e7c2ba3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java @@ -18,6 +18,7 @@ package org.apache.hadoop.mapreduce.v2.app; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Mockito.mock; @@ -28,9 +29,6 @@ import static org.mockito.Mockito.when; import java.io.IOException; -import org.junit.Assert; -import junit.framework.TestCase; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -62,13 +60,14 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.junit.Assert; import org.junit.Test; /** * Make sure that the job staging directory clean up happens. */ - public class TestStagingCleanup extends TestCase { + public class TestStagingCleanup { private Configuration conf = new Configuration(); private FileSystem fs; @@ -81,7 +80,7 @@ import org.junit.Test; public void testDeletionofStagingOnUnregistrationFailure() throws IOException { testDeletionofStagingOnUnregistrationFailure(2, false); - testDeletionofStagingOnUnregistrationFailure(1, true); + testDeletionofStagingOnUnregistrationFailure(1, false); } @SuppressWarnings("resource") @@ -104,7 +103,7 @@ import org.junit.Test; appMaster.init(conf); appMaster.start(); appMaster.shutDownJob(); - ((RunningAppContext) appMaster.getContext()).computeIsLastAMRetry(); + ((RunningAppContext) appMaster.getContext()).resetIsLastAMRetry(); if (shouldHaveDeleted) { Assert.assertEquals(new Boolean(true), appMaster.isLastAMRetry()); verify(fs).delete(stagingJobPath, true); @@ -164,7 +163,11 @@ import org.junit.Test; verify(fs, times(0)).delete(stagingJobPath, true); } - @Test (timeout = 30000) + // FIXME: + // Disabled this test because currently, when job state=REBOOT at shutdown + // when lastRetry = true in RM view, cleanup will not do. + // This will be supported after YARN-2261 completed +// @Test (timeout = 30000) public void testDeletionofStagingOnReboot() throws IOException { conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir); fs = mock(FileSystem.class); @@ -202,7 +205,7 @@ import org.junit.Test; JobId jobid = recordFactory.newRecordInstance(JobId.class); jobid.setAppId(appId); ContainerAllocator mockAlloc = mock(ContainerAllocator.class); - MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, 4); + MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc); appMaster.init(conf); //simulate the process being killed MRAppMaster.MRAppMasterShutdownHook hook = @@ -210,8 +213,12 @@ import org.junit.Test; hook.run(); verify(fs, times(0)).delete(stagingJobPath, true); } - - @Test (timeout = 30000) + + // FIXME: + // Disabled this test because currently, when shutdown hook triggered at + // lastRetry in RM view, cleanup will not do. This should be supported after + // YARN-2261 completed +// @Test (timeout = 30000) public void testDeletionofStagingOnKillLastTry() throws IOException { conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir); fs = mock(FileSystem.class); @@ -226,7 +233,7 @@ import org.junit.Test; JobId jobid = recordFactory.newRecordInstance(JobId.class); jobid.setAppId(appId); ContainerAllocator mockAlloc = mock(ContainerAllocator.class); - MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, 1); //no retry + MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc); //no retry appMaster.init(conf); assertTrue("appMaster.isLastAMRetry() is false", appMaster.isLastAMRetry()); //simulate the process being killed @@ -245,10 +252,10 @@ import org.junit.Test; boolean crushUnregistration = false; public TestMRApp(ApplicationAttemptId applicationAttemptId, - ContainerAllocator allocator, int maxAppAttempts) { + ContainerAllocator allocator) { super(applicationAttemptId, ContainerId.newInstance( applicationAttemptId, 1), "testhost", 2222, 3333, - System.currentTimeMillis(), maxAppAttempts); + System.currentTimeMillis()); this.allocator = allocator; this.successfullyUnregistered.set(true); } @@ -256,7 +263,7 @@ import org.junit.Test; public TestMRApp(ApplicationAttemptId applicationAttemptId, ContainerAllocator allocator, JobStateInternal jobStateInternal, int maxAppAttempts) { - this(applicationAttemptId, allocator, maxAppAttempts); + this(applicationAttemptId, allocator); this.jobStateInternal = jobStateInternal; }