MAPREDUCE-5956. Made MR AM not use maxAttempts to determine if the current attempt is the last retry. Contributed by Wangda Tan.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1609649 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
952b0fb18c
commit
64306aa1b5
|
@ -160,6 +160,9 @@ Release 2.6.0 - UNRELEASED
|
|||
MAPREDUCE-5866. TestFixedLengthInputFormat fails in windows.
|
||||
(Varun Vasudev via cnauroth)
|
||||
|
||||
MAPREDUCE-5956. Made MR AM not use maxAttempts to determine if the current
|
||||
attempt is the last retry. (Wangda Tan via zjshen)
|
||||
|
||||
Release 2.5.0 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -186,7 +186,6 @@ public class MRAppMaster extends CompositeService {
|
|||
private final int nmPort;
|
||||
private final int nmHttpPort;
|
||||
protected final MRAppMetrics metrics;
|
||||
private final int maxAppAttempts;
|
||||
private Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
|
||||
private List<AMInfo> amInfos;
|
||||
private AppContext context;
|
||||
|
@ -227,14 +226,14 @@ public class MRAppMaster extends CompositeService {
|
|||
|
||||
public MRAppMaster(ApplicationAttemptId applicationAttemptId,
|
||||
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
|
||||
long appSubmitTime, int maxAppAttempts) {
|
||||
long appSubmitTime) {
|
||||
this(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort,
|
||||
new SystemClock(), appSubmitTime, maxAppAttempts);
|
||||
new SystemClock(), appSubmitTime);
|
||||
}
|
||||
|
||||
public MRAppMaster(ApplicationAttemptId applicationAttemptId,
|
||||
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
|
||||
Clock clock, long appSubmitTime, int maxAppAttempts) {
|
||||
Clock clock, long appSubmitTime) {
|
||||
super(MRAppMaster.class.getName());
|
||||
this.clock = clock;
|
||||
this.startTime = clock.getTime();
|
||||
|
@ -245,7 +244,6 @@ public class MRAppMaster extends CompositeService {
|
|||
this.nmPort = nmPort;
|
||||
this.nmHttpPort = nmHttpPort;
|
||||
this.metrics = MRAppMetrics.create();
|
||||
this.maxAppAttempts = maxAppAttempts;
|
||||
logSyncer = TaskLog.createLogSyncer();
|
||||
LOG.info("Created MRAppMaster for application " + applicationAttemptId);
|
||||
}
|
||||
|
@ -258,12 +256,6 @@ public class MRAppMaster extends CompositeService {
|
|||
|
||||
context = new RunningAppContext(conf);
|
||||
|
||||
((RunningAppContext)context).computeIsLastAMRetry();
|
||||
LOG.info("The specific max attempts: " + maxAppAttempts +
|
||||
" for application: " + appAttemptID.getApplicationId().getId() +
|
||||
". Attempt num: " + appAttemptID.getAttemptId() +
|
||||
" is last retry: " + isLastAMRetry);
|
||||
|
||||
// Job name is the same as the app name util we support DAG of jobs
|
||||
// for an app later
|
||||
appName = conf.get(MRJobConfig.JOB_NAME, "<missing app name>");
|
||||
|
@ -1007,8 +999,8 @@ public class MRAppMaster extends CompositeService {
|
|||
successfullyUnregistered.set(true);
|
||||
}
|
||||
|
||||
public void computeIsLastAMRetry() {
|
||||
isLastAMRetry = appAttemptID.getAttemptId() >= maxAppAttempts;
|
||||
public void resetIsLastAMRetry() {
|
||||
isLastAMRetry = false;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1388,8 +1380,6 @@ public class MRAppMaster extends CompositeService {
|
|||
System.getenv(Environment.NM_HTTP_PORT.name());
|
||||
String appSubmitTimeStr =
|
||||
System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
|
||||
String maxAppAttempts =
|
||||
System.getenv(ApplicationConstants.MAX_APP_ATTEMPTS_ENV);
|
||||
|
||||
validateInputParam(containerIdStr,
|
||||
Environment.CONTAINER_ID.name());
|
||||
|
@ -1399,8 +1389,6 @@ public class MRAppMaster extends CompositeService {
|
|||
Environment.NM_HTTP_PORT.name());
|
||||
validateInputParam(appSubmitTimeStr,
|
||||
ApplicationConstants.APP_SUBMIT_TIME_ENV);
|
||||
validateInputParam(maxAppAttempts,
|
||||
ApplicationConstants.MAX_APP_ATTEMPTS_ENV);
|
||||
|
||||
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
|
||||
ApplicationAttemptId applicationAttemptId =
|
||||
|
@ -1411,8 +1399,7 @@ public class MRAppMaster extends CompositeService {
|
|||
MRAppMaster appMaster =
|
||||
new MRAppMaster(applicationAttemptId, containerId, nodeHostString,
|
||||
Integer.parseInt(nodePortString),
|
||||
Integer.parseInt(nodeHttpPortString), appSubmitTime,
|
||||
Integer.parseInt(maxAppAttempts));
|
||||
Integer.parseInt(nodeHttpPortString), appSubmitTime);
|
||||
ShutdownHookManager.get().addShutdownHook(
|
||||
new MRAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
|
||||
JobConf conf = new JobConf(new YarnConfiguration());
|
||||
|
|
|
@ -185,7 +185,7 @@ public abstract class RMCommunicator extends AbstractService
|
|||
// if unregistration failed, isLastAMRetry needs to be recalculated
|
||||
// to see whether AM really has the chance to retry
|
||||
RunningAppContext raContext = (RunningAppContext) context;
|
||||
raContext.computeIsLastAMRetry();
|
||||
raContext.resetIsLastAMRetry();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -228,8 +228,8 @@ public class MRApp extends MRAppMaster {
|
|||
int maps, int reduces, boolean autoComplete, String testName,
|
||||
boolean cleanOnStart, int startCount, Clock clock, boolean unregistered,
|
||||
String assignedQueue) {
|
||||
super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock, System
|
||||
.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
|
||||
super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock,
|
||||
System.currentTimeMillis());
|
||||
this.testWorkDir = new File("target", testName);
|
||||
testAbsPath = new Path(testWorkDir.getAbsolutePath());
|
||||
LOG.info("PathUsed: " + testAbsPath);
|
||||
|
|
|
@ -253,6 +253,12 @@ public class TestJobEndNotifier extends JobEndNotifier {
|
|||
HttpServer2 server = startHttpServer();
|
||||
MRApp app = spy(new MRAppWithCustomContainerAllocator(2, 2, false,
|
||||
this.getClass().getName(), true, 2, false));
|
||||
// Currently, we will have isLastRetry always equals to false at beginning
|
||||
// of MRAppMaster, except staging area exists or commit already started at
|
||||
// the beginning.
|
||||
// Now manually set isLastRetry to true and this should reset to false when
|
||||
// unregister failed.
|
||||
app.isLastAMRetry = true;
|
||||
doNothing().when(app).sysexit();
|
||||
JobConf conf = new JobConf();
|
||||
conf.set(JobContext.MR_JOB_END_NOTIFICATION_URL,
|
||||
|
@ -265,12 +271,11 @@ public class TestJobEndNotifier extends JobEndNotifier {
|
|||
// Now shutdown. User should see FAILED state.
|
||||
// Unregistration fails: isLastAMRetry is recalculated, this is
|
||||
app.shutDownJob();
|
||||
Assert.assertTrue(app.isLastAMRetry());
|
||||
Assert.assertEquals(1, JobEndServlet.calledTimes);
|
||||
Assert.assertEquals("jobid=" + job.getID() + "&status=FAILED",
|
||||
JobEndServlet.requestUri.getQuery());
|
||||
Assert.assertEquals(JobState.FAILED.toString(),
|
||||
JobEndServlet.foundJobState);
|
||||
Assert.assertFalse(app.isLastAMRetry());
|
||||
// Since it's not last retry, JobEndServlet didn't called
|
||||
Assert.assertEquals(0, JobEndServlet.calledTimes);
|
||||
Assert.assertNull(JobEndServlet.requestUri);
|
||||
Assert.assertNull(JobEndServlet.foundJobState);
|
||||
server.stop();
|
||||
}
|
||||
|
||||
|
|
|
@ -118,7 +118,7 @@ public class TestMRAppMaster {
|
|||
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
|
||||
MRAppMasterTest appMaster =
|
||||
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
|
||||
System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
|
||||
System.currentTimeMillis());
|
||||
JobConf conf = new JobConf();
|
||||
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
|
||||
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
|
||||
|
@ -147,8 +147,7 @@ public class TestMRAppMaster {
|
|||
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
|
||||
MRAppMaster appMaster =
|
||||
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
|
||||
System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS,
|
||||
false, false);
|
||||
System.currentTimeMillis(), false, false);
|
||||
boolean caught = false;
|
||||
try {
|
||||
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
|
||||
|
@ -186,8 +185,7 @@ public class TestMRAppMaster {
|
|||
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
|
||||
MRAppMaster appMaster =
|
||||
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
|
||||
System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS,
|
||||
false, false);
|
||||
System.currentTimeMillis(), false, false);
|
||||
boolean caught = false;
|
||||
try {
|
||||
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
|
||||
|
@ -225,8 +223,7 @@ public class TestMRAppMaster {
|
|||
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
|
||||
MRAppMaster appMaster =
|
||||
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
|
||||
System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS,
|
||||
false, false);
|
||||
System.currentTimeMillis(), false, false);
|
||||
boolean caught = false;
|
||||
try {
|
||||
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
|
||||
|
@ -264,8 +261,7 @@ public class TestMRAppMaster {
|
|||
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
|
||||
MRAppMaster appMaster =
|
||||
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
|
||||
System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS,
|
||||
false, false);
|
||||
System.currentTimeMillis(), false, false);
|
||||
boolean caught = false;
|
||||
try {
|
||||
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
|
||||
|
@ -285,8 +281,9 @@ public class TestMRAppMaster {
|
|||
@Test (timeout = 30000)
|
||||
public void testMRAppMasterMaxAppAttempts() throws IOException,
|
||||
InterruptedException {
|
||||
int[] maxAppAttemtps = new int[] { 1, 2, 3 };
|
||||
Boolean[] expectedBools = new Boolean[]{ true, true, false };
|
||||
// No matter what's the maxAppAttempt or attempt id, the isLastRetry always
|
||||
// equals to false
|
||||
Boolean[] expectedBools = new Boolean[]{ false, false, false };
|
||||
|
||||
String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
|
||||
String containerIdStr = "container_1317529182569_0004_000002_1";
|
||||
|
@ -301,10 +298,10 @@ public class TestMRAppMaster {
|
|||
File stagingDir =
|
||||
new File(MRApps.getStagingAreaDir(conf, userName).toString());
|
||||
stagingDir.mkdirs();
|
||||
for (int i = 0; i < maxAppAttemtps.length; ++i) {
|
||||
for (int i = 0; i < expectedBools.length; ++i) {
|
||||
MRAppMasterTest appMaster =
|
||||
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
|
||||
System.currentTimeMillis(), maxAppAttemtps[i], false, true);
|
||||
System.currentTimeMillis(), false, true);
|
||||
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
|
||||
assertEquals("isLastAMRetry is correctly computed.", expectedBools[i],
|
||||
appMaster.isLastAMRetry());
|
||||
|
@ -399,7 +396,7 @@ public class TestMRAppMaster {
|
|||
|
||||
MRAppMasterTest appMaster =
|
||||
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
|
||||
System.currentTimeMillis(), 1, false, true);
|
||||
System.currentTimeMillis(), false, true);
|
||||
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
|
||||
|
||||
// Now validate the task credentials
|
||||
|
@ -466,16 +463,15 @@ class MRAppMasterTest extends MRAppMaster {
|
|||
|
||||
public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
|
||||
ContainerId containerId, String host, int port, int httpPort,
|
||||
long submitTime, int maxAppAttempts) {
|
||||
long submitTime) {
|
||||
this(applicationAttemptId, containerId, host, port, httpPort,
|
||||
submitTime, maxAppAttempts, true, true);
|
||||
submitTime, true, true);
|
||||
}
|
||||
public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
|
||||
ContainerId containerId, String host, int port, int httpPort,
|
||||
long submitTime, int maxAppAttempts, boolean overrideInit,
|
||||
long submitTime, boolean overrideInit,
|
||||
boolean overrideStart) {
|
||||
super(applicationAttemptId, containerId, host, port, httpPort, submitTime,
|
||||
maxAppAttempts);
|
||||
super(applicationAttemptId, containerId, host, port, httpPort, submitTime);
|
||||
this.overrideInit = overrideInit;
|
||||
this.overrideStart = overrideStart;
|
||||
mockContainerAllocator = mock(ContainerAllocator.class);
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.mapreduce.v2.app;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyBoolean;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
@ -28,9 +29,6 @@ import static org.mockito.Mockito.when;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.junit.Assert;
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -62,13 +60,14 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
|
|||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
|
||||
/**
|
||||
* Make sure that the job staging directory clean up happens.
|
||||
*/
|
||||
public class TestStagingCleanup extends TestCase {
|
||||
public class TestStagingCleanup {
|
||||
|
||||
private Configuration conf = new Configuration();
|
||||
private FileSystem fs;
|
||||
|
@ -81,7 +80,7 @@ import org.junit.Test;
|
|||
public void testDeletionofStagingOnUnregistrationFailure()
|
||||
throws IOException {
|
||||
testDeletionofStagingOnUnregistrationFailure(2, false);
|
||||
testDeletionofStagingOnUnregistrationFailure(1, true);
|
||||
testDeletionofStagingOnUnregistrationFailure(1, false);
|
||||
}
|
||||
|
||||
@SuppressWarnings("resource")
|
||||
|
@ -104,7 +103,7 @@ import org.junit.Test;
|
|||
appMaster.init(conf);
|
||||
appMaster.start();
|
||||
appMaster.shutDownJob();
|
||||
((RunningAppContext) appMaster.getContext()).computeIsLastAMRetry();
|
||||
((RunningAppContext) appMaster.getContext()).resetIsLastAMRetry();
|
||||
if (shouldHaveDeleted) {
|
||||
Assert.assertEquals(new Boolean(true), appMaster.isLastAMRetry());
|
||||
verify(fs).delete(stagingJobPath, true);
|
||||
|
@ -164,7 +163,11 @@ import org.junit.Test;
|
|||
verify(fs, times(0)).delete(stagingJobPath, true);
|
||||
}
|
||||
|
||||
@Test (timeout = 30000)
|
||||
// FIXME:
|
||||
// Disabled this test because currently, when job state=REBOOT at shutdown
|
||||
// when lastRetry = true in RM view, cleanup will not do.
|
||||
// This will be supported after YARN-2261 completed
|
||||
// @Test (timeout = 30000)
|
||||
public void testDeletionofStagingOnReboot() throws IOException {
|
||||
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
|
||||
fs = mock(FileSystem.class);
|
||||
|
@ -202,7 +205,7 @@ import org.junit.Test;
|
|||
JobId jobid = recordFactory.newRecordInstance(JobId.class);
|
||||
jobid.setAppId(appId);
|
||||
ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
|
||||
MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, 4);
|
||||
MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc);
|
||||
appMaster.init(conf);
|
||||
//simulate the process being killed
|
||||
MRAppMaster.MRAppMasterShutdownHook hook =
|
||||
|
@ -210,8 +213,12 @@ import org.junit.Test;
|
|||
hook.run();
|
||||
verify(fs, times(0)).delete(stagingJobPath, true);
|
||||
}
|
||||
|
||||
@Test (timeout = 30000)
|
||||
|
||||
// FIXME:
|
||||
// Disabled this test because currently, when shutdown hook triggered at
|
||||
// lastRetry in RM view, cleanup will not do. This should be supported after
|
||||
// YARN-2261 completed
|
||||
// @Test (timeout = 30000)
|
||||
public void testDeletionofStagingOnKillLastTry() throws IOException {
|
||||
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
|
||||
fs = mock(FileSystem.class);
|
||||
|
@ -226,7 +233,7 @@ import org.junit.Test;
|
|||
JobId jobid = recordFactory.newRecordInstance(JobId.class);
|
||||
jobid.setAppId(appId);
|
||||
ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
|
||||
MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, 1); //no retry
|
||||
MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc); //no retry
|
||||
appMaster.init(conf);
|
||||
assertTrue("appMaster.isLastAMRetry() is false", appMaster.isLastAMRetry());
|
||||
//simulate the process being killed
|
||||
|
@ -245,10 +252,10 @@ import org.junit.Test;
|
|||
boolean crushUnregistration = false;
|
||||
|
||||
public TestMRApp(ApplicationAttemptId applicationAttemptId,
|
||||
ContainerAllocator allocator, int maxAppAttempts) {
|
||||
ContainerAllocator allocator) {
|
||||
super(applicationAttemptId, ContainerId.newInstance(
|
||||
applicationAttemptId, 1), "testhost", 2222, 3333,
|
||||
System.currentTimeMillis(), maxAppAttempts);
|
||||
System.currentTimeMillis());
|
||||
this.allocator = allocator;
|
||||
this.successfullyUnregistered.set(true);
|
||||
}
|
||||
|
@ -256,7 +263,7 @@ import org.junit.Test;
|
|||
public TestMRApp(ApplicationAttemptId applicationAttemptId,
|
||||
ContainerAllocator allocator, JobStateInternal jobStateInternal,
|
||||
int maxAppAttempts) {
|
||||
this(applicationAttemptId, allocator, maxAppAttempts);
|
||||
this(applicationAttemptId, allocator);
|
||||
this.jobStateInternal = jobStateInternal;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue