svn merge -c 1431131 FIXES: MAPREDUCE-4848. TaskAttemptContext cast error during AM recovery. Contributed by Jerry Chen
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1431134 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5da072bd59
commit
fb0a8ff926
|
@ -525,6 +525,9 @@ Release 0.23.6 - UNRELEASED
|
|||
MAPREDUCE-4913. TestMRAppMaster#testMRAppMasterMissingStaging occasionally
|
||||
exits (Jason Lowe via tgraves)
|
||||
|
||||
MAPREDUCE-4848. TaskAttemptContext cast error during AM recovery (Jerry
|
||||
Chen via jlowe)
|
||||
|
||||
Release 0.23.5 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -579,7 +579,7 @@ public class MRAppMaster extends CompositeService {
|
|||
*/
|
||||
protected Recovery createRecoveryService(AppContext appContext) {
|
||||
return new RecoveryService(appContext.getApplicationAttemptId(),
|
||||
appContext.getClock(), getCommitter());
|
||||
appContext.getClock(), getCommitter(), isNewApiCommitter());
|
||||
}
|
||||
|
||||
/** Create and initialize (but don't start) a single job.
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.OutputCommitter;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||
|
@ -100,6 +101,7 @@ public class RecoveryService extends CompositeService implements Recovery {
|
|||
|
||||
private final ApplicationAttemptId applicationAttemptId;
|
||||
private final OutputCommitter committer;
|
||||
private final boolean newApiCommitter;
|
||||
private final Dispatcher dispatcher;
|
||||
private final ControlledClock clock;
|
||||
|
||||
|
@ -113,10 +115,11 @@ public class RecoveryService extends CompositeService implements Recovery {
|
|||
private volatile boolean recoveryMode = false;
|
||||
|
||||
public RecoveryService(ApplicationAttemptId applicationAttemptId,
|
||||
Clock clock, OutputCommitter committer) {
|
||||
Clock clock, OutputCommitter committer, boolean newApiCommitter) {
|
||||
super("RecoveringDispatcher");
|
||||
this.applicationAttemptId = applicationAttemptId;
|
||||
this.committer = committer;
|
||||
this.newApiCommitter = newApiCommitter;
|
||||
this.dispatcher = createRecoveryDispatcher();
|
||||
this.clock = new ControlledClock(clock);
|
||||
addService((Service) dispatcher);
|
||||
|
@ -360,8 +363,17 @@ public class RecoveryService extends CompositeService implements Recovery {
|
|||
switch (state) {
|
||||
case SUCCEEDED:
|
||||
//recover the task output
|
||||
TaskAttemptContext taskContext = new TaskAttemptContextImpl(getConfig(),
|
||||
|
||||
// check the committer type and construct corresponding context
|
||||
TaskAttemptContext taskContext = null;
|
||||
if(newApiCommitter) {
|
||||
taskContext = new TaskAttemptContextImpl(getConfig(),
|
||||
attInfo.getAttemptId());
|
||||
} else {
|
||||
taskContext = new org.apache.hadoop.mapred.TaskAttemptContextImpl(new JobConf(getConfig()),
|
||||
TypeConverter.fromYarn(aId));
|
||||
}
|
||||
|
||||
try {
|
||||
TaskType type = taskContext.getTaskAttemptID().getTaskID().getTaskType();
|
||||
int numReducers = taskContext.getConfiguration().getInt(MRJobConfig.NUM_REDUCES, 1);
|
||||
|
|
|
@ -626,6 +626,115 @@ public class TestRecovery {
|
|||
validateOutput();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecoveryWithOldCommiter() throws Exception {
|
||||
int runCount = 0;
|
||||
MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(),
|
||||
true, ++runCount);
|
||||
Configuration conf = new Configuration();
|
||||
conf.setBoolean("mapred.mapper.new-api", false);
|
||||
conf.setBoolean("mapred.reducer.new-api", false);
|
||||
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
||||
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
|
||||
Job job = app.submit(conf);
|
||||
app.waitForState(job, JobState.RUNNING);
|
||||
Assert.assertEquals("No of tasks not correct",
|
||||
3, job.getTasks().size());
|
||||
Iterator<Task> it = job.getTasks().values().iterator();
|
||||
Task mapTask1 = it.next();
|
||||
Task reduceTask1 = it.next();
|
||||
|
||||
// all maps must be running
|
||||
app.waitForState(mapTask1, TaskState.RUNNING);
|
||||
|
||||
TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator()
|
||||
.next();
|
||||
|
||||
//before sending the TA_DONE, event make sure attempt has come to
|
||||
//RUNNING state
|
||||
app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
|
||||
|
||||
//send the done signal to the map
|
||||
app.getContext().getEventHandler().handle(
|
||||
new TaskAttemptEvent(
|
||||
task1Attempt1.getID(),
|
||||
TaskAttemptEventType.TA_DONE));
|
||||
|
||||
//wait for map task to complete
|
||||
app.waitForState(mapTask1, TaskState.SUCCEEDED);
|
||||
|
||||
// Verify the shuffle-port
|
||||
Assert.assertEquals(5467, task1Attempt1.getShufflePort());
|
||||
|
||||
app.waitForState(reduceTask1, TaskState.RUNNING);
|
||||
TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
|
||||
|
||||
// write output corresponding to reduce1
|
||||
writeOutput(reduce1Attempt1, conf);
|
||||
|
||||
//send the done signal to the 1st reduce
|
||||
app.getContext().getEventHandler().handle(
|
||||
new TaskAttemptEvent(
|
||||
reduce1Attempt1.getID(),
|
||||
TaskAttemptEventType.TA_DONE));
|
||||
|
||||
//wait for first reduce task to complete
|
||||
app.waitForState(reduceTask1, TaskState.SUCCEEDED);
|
||||
|
||||
//stop the app before the job completes.
|
||||
app.stop();
|
||||
|
||||
//rerun
|
||||
//in rerun the map will be recovered from previous run
|
||||
app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false,
|
||||
++runCount);
|
||||
conf = new Configuration();
|
||||
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
|
||||
conf.setBoolean("mapred.mapper.new-api", false);
|
||||
conf.setBoolean("mapred.reducer.new-api", false);
|
||||
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
|
||||
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
||||
job = app.submit(conf);
|
||||
app.waitForState(job, JobState.RUNNING);
|
||||
Assert.assertEquals("No of tasks not correct",
|
||||
3, job.getTasks().size());
|
||||
it = job.getTasks().values().iterator();
|
||||
mapTask1 = it.next();
|
||||
reduceTask1 = it.next();
|
||||
Task reduceTask2 = it.next();
|
||||
|
||||
// map will be recovered, no need to send done
|
||||
app.waitForState(mapTask1, TaskState.SUCCEEDED);
|
||||
|
||||
// Verify the shuffle-port after recovery
|
||||
task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
|
||||
Assert.assertEquals(5467, task1Attempt1.getShufflePort());
|
||||
|
||||
// first reduce will be recovered, no need to send done
|
||||
app.waitForState(reduceTask1, TaskState.SUCCEEDED);
|
||||
|
||||
app.waitForState(reduceTask2, TaskState.RUNNING);
|
||||
|
||||
TaskAttempt reduce2Attempt = reduceTask2.getAttempts().values()
|
||||
.iterator().next();
|
||||
//before sending the TA_DONE, event make sure attempt has come to
|
||||
//RUNNING state
|
||||
app.waitForState(reduce2Attempt, TaskAttemptState.RUNNING);
|
||||
|
||||
//send the done signal to the 2nd reduce task
|
||||
app.getContext().getEventHandler().handle(
|
||||
new TaskAttemptEvent(
|
||||
reduce2Attempt.getID(),
|
||||
TaskAttemptEventType.TA_DONE));
|
||||
|
||||
//wait to get it completed
|
||||
app.waitForState(reduceTask2, TaskState.SUCCEEDED);
|
||||
|
||||
app.waitForState(job, JobState.SUCCEEDED);
|
||||
app.verifyCompleted();
|
||||
validateOutput();
|
||||
}
|
||||
|
||||
private void writeBadOutput(TaskAttempt attempt, Configuration conf)
|
||||
throws Exception {
|
||||
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
|
||||
|
|
Loading…
Reference in New Issue