MAPREDUCE-3802. Added test to validate that AM can crash multiple times and still can recover successfully after MAPREDUCE-3846. (vinodkv)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1244178 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a87328dfab
commit
1903e544a4
|
@ -809,6 +809,9 @@ Release 0.23.1 - 2012-02-08
|
||||||
MAPREDUCE-3846. Addressed MR AM hanging issues during AM restart and then
|
MAPREDUCE-3846. Addressed MR AM hanging issues during AM restart and then
|
||||||
the recovery. (vinodkv)
|
the recovery. (vinodkv)
|
||||||
|
|
||||||
|
MAPREDUCE-3802. Added test to validate that AM can crash multiple times and
|
||||||
|
still can recover successfully after MAPREDUCE-3846. (vinodkv)
|
||||||
|
|
||||||
Release 0.23.0 - 2011-11-01
|
Release 0.23.0 - 2011-11-01
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -277,6 +277,136 @@ public class TestRecovery {
|
||||||
// available in the failed attempt should be available here
|
// available in the failed attempt should be available here
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipleCrashes() throws Exception {
|
||||||
|
|
||||||
|
int runCount = 0;
|
||||||
|
MRApp app =
|
||||||
|
new MRAppWithHistory(2, 1, false, this.getClass().getName(), true,
|
||||||
|
++runCount);
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setBoolean("mapred.mapper.new-api", true);
|
||||||
|
conf.setBoolean("mapred.reducer.new-api", true);
|
||||||
|
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
||||||
|
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
|
||||||
|
Job job = app.submit(conf);
|
||||||
|
app.waitForState(job, JobState.RUNNING);
|
||||||
|
//all maps would be running
|
||||||
|
Assert.assertEquals("No of tasks not correct",
|
||||||
|
3, job.getTasks().size());
|
||||||
|
Iterator<Task> it = job.getTasks().values().iterator();
|
||||||
|
Task mapTask1 = it.next();
|
||||||
|
Task mapTask2 = it.next();
|
||||||
|
Task reduceTask = it.next();
|
||||||
|
|
||||||
|
// all maps must be running
|
||||||
|
app.waitForState(mapTask1, TaskState.RUNNING);
|
||||||
|
app.waitForState(mapTask2, TaskState.RUNNING);
|
||||||
|
|
||||||
|
TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
|
||||||
|
TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator().next();
|
||||||
|
|
||||||
|
//before sending the TA_DONE, event make sure attempt has come to
|
||||||
|
//RUNNING state
|
||||||
|
app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
|
||||||
|
app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
|
||||||
|
|
||||||
|
// reduces must be in NEW state
|
||||||
|
Assert.assertEquals("Reduce Task state not correct",
|
||||||
|
TaskState.RUNNING, reduceTask.getReport().getTaskState());
|
||||||
|
|
||||||
|
//send the done signal to the 1st map
|
||||||
|
app.getContext().getEventHandler().handle(
|
||||||
|
new TaskAttemptEvent(
|
||||||
|
task1Attempt1.getID(),
|
||||||
|
TaskAttemptEventType.TA_DONE));
|
||||||
|
|
||||||
|
//wait for first map task to complete
|
||||||
|
app.waitForState(mapTask1, TaskState.SUCCEEDED);
|
||||||
|
|
||||||
|
// Crash the app
|
||||||
|
app.stop();
|
||||||
|
|
||||||
|
//rerun
|
||||||
|
//in rerun the 1st map will be recovered from previous run
|
||||||
|
app =
|
||||||
|
new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
|
||||||
|
++runCount);
|
||||||
|
conf = new Configuration();
|
||||||
|
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
|
||||||
|
conf.setBoolean("mapred.mapper.new-api", true);
|
||||||
|
conf.setBoolean("mapred.reducer.new-api", true);
|
||||||
|
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
|
||||||
|
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
||||||
|
job = app.submit(conf);
|
||||||
|
app.waitForState(job, JobState.RUNNING);
|
||||||
|
//all maps would be running
|
||||||
|
Assert.assertEquals("No of tasks not correct",
|
||||||
|
3, job.getTasks().size());
|
||||||
|
it = job.getTasks().values().iterator();
|
||||||
|
mapTask1 = it.next();
|
||||||
|
mapTask2 = it.next();
|
||||||
|
reduceTask = it.next();
|
||||||
|
|
||||||
|
// first map will be recovered, no need to send done
|
||||||
|
app.waitForState(mapTask1, TaskState.SUCCEEDED);
|
||||||
|
|
||||||
|
app.waitForState(mapTask2, TaskState.RUNNING);
|
||||||
|
|
||||||
|
task2Attempt = mapTask2.getAttempts().values().iterator().next();
|
||||||
|
//before sending the TA_DONE, event make sure attempt has come to
|
||||||
|
//RUNNING state
|
||||||
|
app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
|
||||||
|
|
||||||
|
//send the done signal to the 2nd map task
|
||||||
|
app.getContext().getEventHandler().handle(
|
||||||
|
new TaskAttemptEvent(
|
||||||
|
mapTask2.getAttempts().values().iterator().next().getID(),
|
||||||
|
TaskAttemptEventType.TA_DONE));
|
||||||
|
|
||||||
|
//wait to get it completed
|
||||||
|
app.waitForState(mapTask2, TaskState.SUCCEEDED);
|
||||||
|
|
||||||
|
// Crash the app again.
|
||||||
|
app.stop();
|
||||||
|
|
||||||
|
//rerun
|
||||||
|
//in rerun the 1st and 2nd map will be recovered from previous run
|
||||||
|
app =
|
||||||
|
new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
|
||||||
|
++runCount);
|
||||||
|
conf = new Configuration();
|
||||||
|
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
|
||||||
|
conf.setBoolean("mapred.mapper.new-api", true);
|
||||||
|
conf.setBoolean("mapred.reducer.new-api", true);
|
||||||
|
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
|
||||||
|
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
||||||
|
job = app.submit(conf);
|
||||||
|
app.waitForState(job, JobState.RUNNING);
|
||||||
|
//all maps would be running
|
||||||
|
Assert.assertEquals("No of tasks not correct",
|
||||||
|
3, job.getTasks().size());
|
||||||
|
it = job.getTasks().values().iterator();
|
||||||
|
mapTask1 = it.next();
|
||||||
|
mapTask2 = it.next();
|
||||||
|
reduceTask = it.next();
|
||||||
|
|
||||||
|
// The maps will be recovered, no need to send done
|
||||||
|
app.waitForState(mapTask1, TaskState.SUCCEEDED);
|
||||||
|
app.waitForState(mapTask2, TaskState.SUCCEEDED);
|
||||||
|
|
||||||
|
//wait for reduce to be running before sending done
|
||||||
|
app.waitForState(reduceTask, TaskState.RUNNING);
|
||||||
|
//send the done signal to the reduce
|
||||||
|
app.getContext().getEventHandler().handle(
|
||||||
|
new TaskAttemptEvent(
|
||||||
|
reduceTask.getAttempts().values().iterator().next().getID(),
|
||||||
|
TaskAttemptEventType.TA_DONE));
|
||||||
|
|
||||||
|
app.waitForState(job, JobState.SUCCEEDED);
|
||||||
|
app.verifyCompleted();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testOutputRecovery() throws Exception {
|
public void testOutputRecovery() throws Exception {
|
||||||
int runCount = 0;
|
int runCount = 0;
|
||||||
|
|
Loading…
Reference in New Issue