From 46315a2d914058969c7234272420c063ce268bf5 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Mon, 25 Mar 2013 22:33:43 +0000 Subject: [PATCH] MAPREDUCE-5062. Fix MR AM to read max-retries from the RM. Contributed by *Zhijie Shen. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1460923 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../hadoop/mapreduce/v2/app/MRAppMaster.java | 42 +++++---- .../apache/hadoop/mapreduce/v2/app/MRApp.java | 2 +- .../mapreduce/v2/app/TestMRAppMaster.java | 89 +++++++++++++++---- .../mapreduce/v2/app/TestStagingCleanup.java | 21 +++-- .../apache/hadoop/mapreduce/MRJobConfig.java | 8 ++ .../src/main/resources/mapred-default.xml | 8 ++ .../org/apache/hadoop/mapred/YARNRunner.java | 3 + 8 files changed, 133 insertions(+), 43 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 771ea575828..5f8d73ff69f 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -253,6 +253,9 @@ Release 2.0.5-beta - UNRELEASED MAPREDUCE-5083. MiniMRCluster should use a random component when creating an actual cluster (Siddharth Seth via hitesh) + MAPREDUCE-5062. Fix MR AM to read max-retries from the RM. (Zhijie Shen via + vinodkv) + Release 2.0.4-alpha - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 8598706dde4..fe7aa4cedbc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -161,6 +161,7 @@ public class MRAppMaster extends CompositeService { private final int nmPort; private final int nmHttpPort; protected final MRAppMetrics metrics; + private final int maxAppAttempts; private Map completedTasksFromPreviousRun; private List amInfos; private AppContext context; @@ -194,14 +195,14 @@ public class MRAppMaster extends CompositeService { public MRAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId, String nmHost, int nmPort, int nmHttpPort, - long appSubmitTime) { + long appSubmitTime, int maxAppAttempts) { this(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, - new SystemClock(), appSubmitTime); + new SystemClock(), appSubmitTime, maxAppAttempts); } public MRAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId, String nmHost, int nmPort, int nmHttpPort, - Clock clock, long appSubmitTime) { + Clock clock, long appSubmitTime, int maxAppAttempts) { super(MRAppMaster.class.getName()); this.clock = clock; this.startTime = clock.getTime(); @@ -212,6 +213,7 @@ public class MRAppMaster extends CompositeService { this.nmPort = nmPort; this.nmHttpPort = nmHttpPort; this.metrics = MRAppMetrics.create(); + this.maxAppAttempts = maxAppAttempts; LOG.info("Created MRAppMaster for application " + applicationAttemptId); } @@ -220,18 +222,13 @@ public class MRAppMaster extends CompositeService { conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); downloadTokensAndSetupUGI(conf); - - //TODO this is a hack, we really need the RM to inform us when we - // are the last one. This would allow us to configure retries on - // a per application basis. - int numAMRetries = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES, - YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES); - isLastAMRetry = appAttemptID.getAttemptId() >= numAMRetries; - LOG.info("AM Retries: " + numAMRetries + - " attempt num: " + appAttemptID.getAttemptId() + + + isLastAMRetry = appAttemptID.getAttemptId() >= maxAppAttempts; + LOG.info("The specific max attempts: " + maxAppAttempts + + " for application: " + appAttemptID.getApplicationId().getId() + + ". Attempt num: " + appAttemptID.getAttemptId() + " is last retry: " + isLastAMRetry); - - + context = new RunningAppContext(conf); // Job name is the same as the app name util we support DAG of jobs @@ -266,6 +263,9 @@ public class MRAppMaster extends CompositeService { boolean commitFailure = fs.exists(endCommitFailureFile); if(!stagingExists) { isLastAMRetry = true; + LOG.info("Attempt num: " + appAttemptID.getAttemptId() + + " is last retry: " + isLastAMRetry + + " because the staging dir doesn't exist."); errorHappenedShutDown = true; forcedState = JobStateInternal.ERROR; shutDownMessage = "Staging dir does not exist " + stagingDir; @@ -275,6 +275,9 @@ public class MRAppMaster extends CompositeService { // what result we will use to notify, and how we will unregister errorHappenedShutDown = true; isLastAMRetry = true; + LOG.info("Attempt num: " + appAttemptID.getAttemptId() + + " is last retry: " + isLastAMRetry + + " because a commit was started."); copyHistory = true; if (commitSuccess) { shutDownMessage = "We crashed after successfully committing. Recovering."; @@ -777,6 +780,10 @@ public class MRAppMaster extends CompositeService { return taskAttemptListener; } + public Boolean isLastAMRetry() { + return isLastAMRetry; + } + /** * By the time life-cycle of this router starts, job-init would have already * happened. @@ -1206,6 +1213,8 @@ public class MRAppMaster extends CompositeService { System.getenv(ApplicationConstants.NM_HTTP_PORT_ENV); String appSubmitTimeStr = System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV); + String maxAppAttempts = + System.getenv(ApplicationConstants.MAX_APP_ATTEMPTS_ENV); validateInputParam(containerIdStr, ApplicationConstants.AM_CONTAINER_ID_ENV); @@ -1215,6 +1224,8 @@ public class MRAppMaster extends CompositeService { ApplicationConstants.NM_HTTP_PORT_ENV); validateInputParam(appSubmitTimeStr, ApplicationConstants.APP_SUBMIT_TIME_ENV); + validateInputParam(maxAppAttempts, + ApplicationConstants.MAX_APP_ATTEMPTS_ENV); ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); ApplicationAttemptId applicationAttemptId = @@ -1224,7 +1235,8 @@ public class MRAppMaster extends CompositeService { MRAppMaster appMaster = new MRAppMaster(applicationAttemptId, containerId, nodeHostString, Integer.parseInt(nodePortString), - Integer.parseInt(nodeHttpPortString), appSubmitTime); + Integer.parseInt(nodeHttpPortString), appSubmitTime, + Integer.parseInt(maxAppAttempts)); ShutdownHookManager.get().addShutdownHook( new MRAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY); YarnConfiguration conf = new YarnConfiguration(new JobConf()); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index 0f24666e6f5..4a28ab00637 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -192,7 +192,7 @@ public class MRApp extends MRAppMaster { int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart, int startCount, Clock clock) { super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock, System - .currentTimeMillis()); + .currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS); this.testWorkDir = new File("target", testName); testAbsPath = new Path(testWorkDir.getAbsolutePath()); LOG.info("PathUsed: " + testAbsPath); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java index 2ac20701879..e0c618b556b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java @@ -30,11 +30,15 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; +import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent; +import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler; import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; +import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.UserGroupInformation; @@ -42,6 +46,7 @@ import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Before; import org.junit.BeforeClass; @@ -80,7 +85,7 @@ public class TestMRAppMaster { ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); MRAppMasterTest appMaster = new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1, - System.currentTimeMillis()); + System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS); YarnConfiguration conf = new YarnConfiguration(); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); @@ -109,7 +114,8 @@ public class TestMRAppMaster { ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); MRAppMaster appMaster = new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1, - System.currentTimeMillis(), false); + System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS, + false, false); boolean caught = false; try { MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); @@ -144,7 +150,8 @@ public class TestMRAppMaster { ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); MRAppMaster appMaster = new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1, - System.currentTimeMillis(), false); + System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS, + false, false); boolean caught = false; try { MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); @@ -179,7 +186,8 @@ public class TestMRAppMaster { ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); MRAppMaster appMaster = new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1, - System.currentTimeMillis(), false); + System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS, + false, false); boolean caught = false; try { MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); @@ -214,7 +222,8 @@ public class TestMRAppMaster { ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); MRAppMaster appMaster = new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1, - System.currentTimeMillis(), false); + System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS, + false, false); boolean caught = false; try { MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); @@ -230,36 +239,73 @@ public class TestMRAppMaster { assertEquals(JobStateInternal.ERROR, appMaster.forcedState); appMaster.stop(); } + + @Test (timeout = 30000) + public void testMRAppMasterMaxAppAttempts() throws IOException, + InterruptedException { + int[] maxAppAttemtps = new int[] { 1, 2, 3 }; + Boolean[] expectedBools = new Boolean[]{ true, true, false }; + + String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002"; + String containerIdStr = "container_1317529182569_0004_000002_1"; + + String userName = "TestAppMasterUser"; + ApplicationAttemptId applicationAttemptId = ConverterUtils + .toApplicationAttemptId(applicationAttemptIdStr); + ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); + YarnConfiguration conf = new YarnConfiguration(); + conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); + + File stagingDir = + new File(MRApps.getStagingAreaDir(conf, userName).toString()); + stagingDir.mkdirs(); + for (int i = 0; i < maxAppAttemtps.length; ++i) { + MRAppMasterTest appMaster = + new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1, + System.currentTimeMillis(), maxAppAttemtps[i], false, true); + MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); + assertEquals("isLastAMRetry is correctly computed.", expectedBools[i], + appMaster.isLastAMRetry()); + } + } + } class MRAppMasterTest extends MRAppMaster { Path stagingDirPath; private Configuration conf; - private boolean overrideInitAndStart; + private boolean overrideInit; + private boolean overrideStart; ContainerAllocator mockContainerAllocator; + CommitterEventHandler mockCommitterEventHandler; + RMHeartbeatHandler mockRMHeartbeatHandler; public MRAppMasterTest(ApplicationAttemptId applicationAttemptId, ContainerId containerId, String host, int port, int httpPort, - long submitTime) { - this(applicationAttemptId, containerId, host, port, httpPort, submitTime, - true); + long submitTime, int maxAppAttempts) { + this(applicationAttemptId, containerId, host, port, httpPort, + submitTime, maxAppAttempts, true, true); } public MRAppMasterTest(ApplicationAttemptId applicationAttemptId, ContainerId containerId, String host, int port, int httpPort, - long submitTime, boolean overrideInitAndStart) { - super(applicationAttemptId, containerId, host, port, httpPort, submitTime); - this.overrideInitAndStart = overrideInitAndStart; + long submitTime, int maxAppAttempts, boolean overrideInit, + boolean overrideStart) { + super(applicationAttemptId, containerId, host, port, httpPort, submitTime, + maxAppAttempts); + this.overrideInit = overrideInit; + this.overrideStart = overrideStart; mockContainerAllocator = mock(ContainerAllocator.class); + mockCommitterEventHandler = mock(CommitterEventHandler.class); + mockRMHeartbeatHandler = mock(RMHeartbeatHandler.class); } @Override public void init(Configuration conf) { - if (overrideInitAndStart) { - this.conf = conf; - } else { + if (!overrideInit) { super.init(conf); } + this.conf = conf; } @Override @@ -277,9 +323,20 @@ class MRAppMasterTest extends MRAppMaster { return mockContainerAllocator; } + @Override + protected EventHandler createCommitterEventHandler( + AppContext context, OutputCommitter committer) { + return mockCommitterEventHandler; + } + + @Override + protected RMHeartbeatHandler getRMHeartbeatHandler() { + return mockRMHeartbeatHandler; + } + @Override public void start() { - if (overrideInitAndStart) { + if (overrideStart) { try { String user = UserGroupInformation.getCurrentUser().getShortUserName(); stagingDirPath = MRApps.getStagingAreaDir(conf, user); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java index 7dfc0b3f0f3..b278186766e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java @@ -49,7 +49,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.service.AbstractService; @@ -93,10 +92,9 @@ import org.junit.Test; verify(fs).delete(stagingJobPath, true); } - @Test + @Test (timeout = 30000) public void testDeletionofStagingOnKill() throws IOException { conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir); - conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 4); fs = mock(FileSystem.class); when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true); //Staging Dir exists @@ -113,7 +111,7 @@ import org.junit.Test; JobId jobid = recordFactory.newRecordInstance(JobId.class); jobid.setAppId(appId); ContainerAllocator mockAlloc = mock(ContainerAllocator.class); - MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc); + MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, 4); appMaster.init(conf); //simulate the process being killed MRAppMaster.MRAppMasterShutdownHook hook = @@ -122,10 +120,9 @@ import org.junit.Test; verify(fs, times(0)).delete(stagingJobPath, true); } - @Test + @Test (timeout = 30000) public void testDeletionofStagingOnKillLastTry() throws IOException { conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir); - conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 1); fs = mock(FileSystem.class); when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true); //Staging Dir exists @@ -142,7 +139,8 @@ import org.junit.Test; JobId jobid = recordFactory.newRecordInstance(JobId.class); jobid.setAppId(appId); ContainerAllocator mockAlloc = mock(ContainerAllocator.class); - MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc); + MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, + MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS); appMaster.init(conf); //simulate the process being killed MRAppMaster.MRAppMasterShutdownHook hook = @@ -155,15 +153,16 @@ import org.junit.Test; ContainerAllocator allocator; public TestMRApp(ApplicationAttemptId applicationAttemptId, - ContainerAllocator allocator) { + ContainerAllocator allocator, int maxAppAttempts) { super(applicationAttemptId, BuilderUtils.newContainerId( - applicationAttemptId, 1), "testhost", 2222, 3333, System - .currentTimeMillis()); + applicationAttemptId, 1), "testhost", 2222, 3333, + System.currentTimeMillis(), maxAppAttempts); this.allocator = allocator; } public TestMRApp(ApplicationAttemptId applicationAttemptId) { - this(applicationAttemptId, null); + this(applicationAttemptId, null, + MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS); } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index becdef853fa..3f80065a82c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -663,5 +663,13 @@ public interface MRJobConfig { public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN = "^mapreduce\\.workflow\\.adjacency\\..+"; + + /** + * The maximum number of application attempts. + * It is a application-specific setting. + */ + public static final String MR_AM_MAX_ATTEMPTS = "mapreduce.am.max-attempts"; + + public static final int DEFAULT_MR_AM_MAX_ATTEMPTS = 1; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 83131e7a798..74db2e8b43a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -806,6 +806,14 @@ + + mapreduce.am.max-attempts + 1 + The maximum number of application attempts. It is a + application-specific setting. It should not be larger than the global number + set by resourcemanager. Otherwise, it will be override. + + mapreduce.job.end-notification.url diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index c401b93c964..e8fd18a4c8a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -481,6 +481,9 @@ public class YARNRunner implements ClientProtocol { appContext.setCancelTokensWhenComplete( conf.getBoolean(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, true)); appContext.setAMContainerSpec(amContainer); // AM Container + appContext.setMaxAppAttempts( + conf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, + MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS)); return appContext; }