MAPREDUCE-5468. Fix MR AM recovery for map-only jobs. Contributed by Vinod K. V.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1516358 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
487ce6c7bc
commit
ded91b4cfa
|
@ -231,6 +231,9 @@ Release 2.1.1-beta - UNRELEASED
|
|||
pick up the right history file for the last successful AM. (Jian He via
|
||||
vinodkv)
|
||||
|
||||
MAPREDUCE-5468. Fix MR AM recovery for map-only jobs. (vinodkv via
|
||||
acmurthy)
|
||||
|
||||
Release 2.1.0-beta - 2013-08-22
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -1042,11 +1042,11 @@ public class MRAppMaster extends CompositeService {
|
|||
// attempt will generate one. However that disables recovery if there
|
||||
// are reducers as the shuffle secret would be app attempt specific.
|
||||
int numReduceTasks = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0);
|
||||
boolean shuffleKeyValidForRecovery = (numReduceTasks > 0 &&
|
||||
TokenCache.getShuffleSecretKey(jobCredentials) != null);
|
||||
boolean shuffleKeyValidForRecovery =
|
||||
TokenCache.getShuffleSecretKey(jobCredentials) != null;
|
||||
|
||||
if (recoveryEnabled && recoverySupportedByCommitter
|
||||
&& shuffleKeyValidForRecovery) {
|
||||
&& (numReduceTasks <= 0 || shuffleKeyValidForRecovery)) {
|
||||
LOG.info("Recovery is enabled. "
|
||||
+ "Will try to recover from previous life on best effort basis.");
|
||||
try {
|
||||
|
@ -1059,7 +1059,8 @@ public class MRAppMaster extends CompositeService {
|
|||
} else {
|
||||
LOG.info("Will not try to recover. recoveryEnabled: "
|
||||
+ recoveryEnabled + " recoverySupportedByCommitter: "
|
||||
+ recoverySupportedByCommitter + " shuffleKeyValidForRecovery: "
|
||||
+ recoverySupportedByCommitter + " numReduceTasks: "
|
||||
+ numReduceTasks + " shuffleKeyValidForRecovery: "
|
||||
+ shuffleKeyValidForRecovery + " ApplicationAttemptID: "
|
||||
+ appAttemptID.getAttemptId());
|
||||
// Get the amInfos anyways whether recovery is enabled or not
|
||||
|
|
|
@ -114,7 +114,6 @@ public class TestRecovery {
|
|||
private Text val1 = new Text("val1");
|
||||
private Text val2 = new Text("val2");
|
||||
|
||||
|
||||
/**
|
||||
* AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt
|
||||
* completely disappears because of failed launch, one attempt gets killed and
|
||||
|
@ -316,6 +315,116 @@ public class TestRecovery {
|
|||
// available in the failed attempt should be available here
|
||||
}
|
||||
|
||||
/**
|
||||
* AM with 3 maps and 0 reduce. AM crashes after the first two tasks finishes
|
||||
* and recovers completely and succeeds in the second generation.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testCrashOfMapsOnlyJob() throws Exception {
|
||||
int runCount = 0;
|
||||
MRApp app =
|
||||
new MRAppWithHistory(3, 0, false, this.getClass().getName(), true,
|
||||
++runCount);
|
||||
Configuration conf = new Configuration();
|
||||
conf.setBoolean("mapred.mapper.new-api", true);
|
||||
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
||||
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
|
||||
Job job = app.submit(conf);
|
||||
app.waitForState(job, JobState.RUNNING);
|
||||
|
||||
// all maps would be running
|
||||
Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
|
||||
Iterator<Task> it = job.getTasks().values().iterator();
|
||||
Task mapTask1 = it.next();
|
||||
Task mapTask2 = it.next();
|
||||
Task mapTask3 = it.next();
|
||||
|
||||
// all maps must be running
|
||||
app.waitForState(mapTask1, TaskState.RUNNING);
|
||||
app.waitForState(mapTask2, TaskState.RUNNING);
|
||||
app.waitForState(mapTask3, TaskState.RUNNING);
|
||||
|
||||
TaskAttempt task1Attempt =
|
||||
mapTask1.getAttempts().values().iterator().next();
|
||||
TaskAttempt task2Attempt =
|
||||
mapTask2.getAttempts().values().iterator().next();
|
||||
TaskAttempt task3Attempt =
|
||||
mapTask3.getAttempts().values().iterator().next();
|
||||
|
||||
// before sending the TA_DONE, event make sure attempt has come to
|
||||
// RUNNING state
|
||||
app.waitForState(task1Attempt, TaskAttemptState.RUNNING);
|
||||
app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
|
||||
app.waitForState(task3Attempt, TaskAttemptState.RUNNING);
|
||||
|
||||
// send the done signal to the 1st two maps
|
||||
app
|
||||
.getContext()
|
||||
.getEventHandler()
|
||||
.handle(
|
||||
new TaskAttemptEvent(task1Attempt.getID(), TaskAttemptEventType.TA_DONE));
|
||||
app
|
||||
.getContext()
|
||||
.getEventHandler()
|
||||
.handle(
|
||||
new TaskAttemptEvent(task2Attempt.getID(), TaskAttemptEventType.TA_DONE));
|
||||
|
||||
// wait for first two map task to complete
|
||||
app.waitForState(mapTask1, TaskState.SUCCEEDED);
|
||||
app.waitForState(mapTask2, TaskState.SUCCEEDED);
|
||||
|
||||
// stop the app
|
||||
app.stop();
|
||||
|
||||
// rerun
|
||||
// in rerun the 1st two map will be recovered from previous run
|
||||
app =
|
||||
new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
|
||||
++runCount);
|
||||
conf = new Configuration();
|
||||
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
|
||||
conf.setBoolean("mapred.mapper.new-api", true);
|
||||
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
|
||||
// Set num-reduces explicitly in conf as recovery logic depends on it.
|
||||
conf.setInt(MRJobConfig.NUM_REDUCES, 0);
|
||||
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
||||
job = app.submit(conf);
|
||||
app.waitForState(job, JobState.RUNNING);
|
||||
|
||||
Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
|
||||
it = job.getTasks().values().iterator();
|
||||
mapTask1 = it.next();
|
||||
mapTask2 = it.next();
|
||||
mapTask3 = it.next();
|
||||
|
||||
// first two maps will be recovered, no need to send done
|
||||
app.waitForState(mapTask1, TaskState.SUCCEEDED);
|
||||
app.waitForState(mapTask2, TaskState.SUCCEEDED);
|
||||
|
||||
app.waitForState(mapTask3, TaskState.RUNNING);
|
||||
|
||||
task3Attempt = mapTask3.getAttempts().values().iterator().next();
|
||||
// before sending the TA_DONE, event make sure attempt has come to
|
||||
// RUNNING state
|
||||
app.waitForState(task3Attempt, TaskAttemptState.RUNNING);
|
||||
|
||||
// send the done signal to the 3rd map task
|
||||
app
|
||||
.getContext()
|
||||
.getEventHandler()
|
||||
.handle(
|
||||
new TaskAttemptEvent(mapTask3.getAttempts().values().iterator().next()
|
||||
.getID(), TaskAttemptEventType.TA_DONE));
|
||||
|
||||
// wait to get it completed
|
||||
app.waitForState(mapTask3, TaskState.SUCCEEDED);
|
||||
|
||||
app.waitForState(job, JobState.SUCCEEDED);
|
||||
app.verifyCompleted();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleCrashes() throws Exception {
|
||||
|
||||
|
|
Loading…
Reference in New Issue