svn merge -c 1590668 FIXES: MAPREDUCE-5812. Make job context available to OutputCommitter.isRecoverySupported(). Contributed by Mohammad Kamrul Islam

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1590675 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2014-04-28 15:27:01 +00:00
parent 0f770f0897
commit 3434d7b78b
10 changed files with 359 additions and 9 deletions

View File

@ -33,6 +33,9 @@ Release 2.5.0 - UNRELEASED
MAPREDUCE-5639. Port DistCp2 document to trunk (Akira AJISAKA via jeagles) MAPREDUCE-5639. Port DistCp2 document to trunk (Akira AJISAKA via jeagles)
MAPREDUCE-5812. Make job context available to
OutputCommitter.isRecoverySupported() (Mohammad Kamrul Islam via jlowe)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -158,6 +158,7 @@
<Or> <Or>
<Method name="commitJob" /> <Method name="commitJob" />
<Method name="recoverTask" /> <Method name="recoverTask" />
<Method name="isRecoverySupported" />
</Or> </Or>
<Bug pattern="NM_WRONG_PACKAGE" /> <Bug pattern="NM_WRONG_PACKAGE" />
</Match> </Match>
@ -168,6 +169,7 @@
<Method name="commitJob" /> <Method name="commitJob" />
<Method name="cleanupJob" /> <Method name="cleanupJob" />
<Method name="recoverTask" /> <Method name="recoverTask" />
<Method name="isRecoverySupported" />
</Or> </Or>
<Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" /> <Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" />
</Match> </Match>

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.mapred.LocalContainerLauncher;
import org.apache.hadoop.mapred.TaskAttemptListenerImpl; import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
import org.apache.hadoop.mapred.TaskLog; import org.apache.hadoop.mapred.TaskLog;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol; import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.OutputFormat;
@ -67,6 +68,7 @@ import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@ -1093,7 +1095,24 @@ public class MRAppMaster extends CompositeService {
TaskLog.syncLogsShutdown(logSyncer); TaskLog.syncLogsShutdown(logSyncer);
} }
private void processRecovery() { private boolean isRecoverySupported(OutputCommitter committer2)
throws IOException {
boolean isSupported = false;
JobContext _jobContext;
if (committer != null) {
if (newApiCommitter) {
_jobContext = new JobContextImpl(
getConfig(), TypeConverter.fromYarn(getJobId()));
} else {
_jobContext = new org.apache.hadoop.mapred.JobContextImpl(
new JobConf(getConfig()), TypeConverter.fromYarn(getJobId()));
}
isSupported = committer.isRecoverySupported(_jobContext);
}
return isSupported;
}
private void processRecovery() throws IOException{
if (appAttemptID.getAttemptId() == 1) { if (appAttemptID.getAttemptId() == 1) {
return; // no need to recover on the first attempt return; // no need to recover on the first attempt
} }
@ -1101,8 +1120,8 @@ public class MRAppMaster extends CompositeService {
boolean recoveryEnabled = getConfig().getBoolean( boolean recoveryEnabled = getConfig().getBoolean(
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE,
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE_DEFAULT); MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE_DEFAULT);
boolean recoverySupportedByCommitter =
committer != null && committer.isRecoverySupported(); boolean recoverySupportedByCommitter = isRecoverySupported(committer);
// If a shuffle secret was not provided by the job client then this app // If a shuffle secret was not provided by the job client then this app
// attempt will generate one. However that disables recovery if there // attempt will generate one. However that disables recovery if there

View File

@ -625,10 +625,18 @@ public class MRApp extends MRAppMaster {
throws IOException { throws IOException {
committer.abortJob(jobContext, state); committer.abortJob(jobContext, state);
} }
@Override
public boolean isRecoverySupported(JobContext jobContext) throws IOException{
return committer.isRecoverySupported(jobContext);
}
@SuppressWarnings("deprecation")
@Override @Override
public boolean isRecoverySupported() { public boolean isRecoverySupported() {
return committer.isRecoverySupported(); return committer.isRecoverySupported();
} }
@Override @Override
public void setupTask(TaskAttemptContext taskContext) public void setupTask(TaskAttemptContext taskContext)
throws IOException { throws IOException {

View File

@ -425,6 +425,266 @@ public class TestRecovery {
app.verifyCompleted(); app.verifyCompleted();
} }
/**
* The class provides a custom implementation of output committer setupTask
* and isRecoverySupported methods, which determines if recovery supported
* based on config property.
*/
public static class TestFileOutputCommitter extends
org.apache.hadoop.mapred.FileOutputCommitter {
@Override
public boolean isRecoverySupported(
org.apache.hadoop.mapred.JobContext jobContext) {
boolean isRecoverySupported = false;
if (jobContext != null && jobContext.getConfiguration() != null) {
isRecoverySupported = jobContext.getConfiguration().getBoolean(
"want.am.recovery", false);
}
return isRecoverySupported;
}
}
/**
* This test case primarily verifies if the recovery is controlled through config
* property. In this case, recover is turned ON. AM with 3 maps and 0 reduce.
* AM crashes after the first two tasks finishes and recovers completely and
* succeeds in the second generation.
*
* @throws Exception
*/
@Test
public void testRecoverySuccessUsingCustomOutputCommitter() throws Exception {
int runCount = 0;
MRApp app = new MRAppWithHistory(3, 0, false, this.getClass().getName(),
true, ++runCount);
Configuration conf = new Configuration();
conf.setClass("mapred.output.committer.class",
TestFileOutputCommitter.class,
org.apache.hadoop.mapred.OutputCommitter.class);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
conf.setBoolean("want.am.recovery", true);
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
// all maps would be running
Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
Iterator<Task> it = job.getTasks().values().iterator();
Task mapTask1 = it.next();
Task mapTask2 = it.next();
Task mapTask3 = it.next();
// all maps must be running
app.waitForState(mapTask1, TaskState.RUNNING);
app.waitForState(mapTask2, TaskState.RUNNING);
app.waitForState(mapTask3, TaskState.RUNNING);
TaskAttempt task1Attempt = mapTask1.getAttempts().values().iterator()
.next();
TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator()
.next();
TaskAttempt task3Attempt = mapTask3.getAttempts().values().iterator()
.next();
// before sending the TA_DONE, event make sure attempt has come to
// RUNNING state
app.waitForState(task1Attempt, TaskAttemptState.RUNNING);
app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
app.waitForState(task3Attempt, TaskAttemptState.RUNNING);
// send the done signal to the 1st two maps
app.getContext()
.getEventHandler()
.handle(
new TaskAttemptEvent(task1Attempt.getID(),
TaskAttemptEventType.TA_DONE));
app.getContext()
.getEventHandler()
.handle(
new TaskAttemptEvent(task2Attempt.getID(),
TaskAttemptEventType.TA_DONE));
// wait for first two map task to complete
app.waitForState(mapTask1, TaskState.SUCCEEDED);
app.waitForState(mapTask2, TaskState.SUCCEEDED);
// stop the app
app.stop();
// rerun
// in rerun the 1st two map will be recovered from previous run
app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
++runCount);
conf = new Configuration();
conf.setClass("mapred.output.committer.class",
TestFileOutputCommitter.class,
org.apache.hadoop.mapred.OutputCommitter.class);
conf.setBoolean("want.am.recovery", true);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
// Set num-reduces explicitly in conf as recovery logic depends on it.
conf.setInt(MRJobConfig.NUM_REDUCES, 0);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
it = job.getTasks().values().iterator();
mapTask1 = it.next();
mapTask2 = it.next();
mapTask3 = it.next();
// first two maps will be recovered, no need to send done
app.waitForState(mapTask1, TaskState.SUCCEEDED);
app.waitForState(mapTask2, TaskState.SUCCEEDED);
app.waitForState(mapTask3, TaskState.RUNNING);
task3Attempt = mapTask3.getAttempts().values().iterator().next();
// before sending the TA_DONE, event make sure attempt has come to
// RUNNING state
app.waitForState(task3Attempt, TaskAttemptState.RUNNING);
// send the done signal to the 3rd map task
app.getContext()
.getEventHandler()
.handle(
new TaskAttemptEvent(mapTask3.getAttempts().values().iterator()
.next().getID(), TaskAttemptEventType.TA_DONE));
// wait to get it completed
app.waitForState(mapTask3, TaskState.SUCCEEDED);
app.waitForState(job, JobState.SUCCEEDED);
app.verifyCompleted();
}
/**
* This test case primarily verifies if the recovery is controlled through config
* property. In this case, recover is turned OFF. AM with 3 maps and 0 reduce.
* AM crashes after the first two tasks finishes and recovery fails and have
* to rerun fully in the second generation and succeeds.
*
* @throws Exception
*/
@Test
public void testRecoveryFailsUsingCustomOutputCommitter() throws Exception {
int runCount = 0;
MRApp app =
new MRAppWithHistory(3, 0, false, this.getClass().getName(), true,
++runCount);
Configuration conf = new Configuration();
conf.setClass("mapred.output.committer.class", TestFileOutputCommitter.class,
org.apache.hadoop.mapred.OutputCommitter.class);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
conf.setBoolean("want.am.recovery", false);
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
// all maps would be running
Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
Iterator<Task> it = job.getTasks().values().iterator();
Task mapTask1 = it.next();
Task mapTask2 = it.next();
Task mapTask3 = it.next();
// all maps must be running
app.waitForState(mapTask1, TaskState.RUNNING);
app.waitForState(mapTask2, TaskState.RUNNING);
app.waitForState(mapTask3, TaskState.RUNNING);
TaskAttempt task1Attempt =
mapTask1.getAttempts().values().iterator().next();
TaskAttempt task2Attempt =
mapTask2.getAttempts().values().iterator().next();
TaskAttempt task3Attempt =
mapTask3.getAttempts().values().iterator().next();
// before sending the TA_DONE, event make sure attempt has come to
// RUNNING state
app.waitForState(task1Attempt, TaskAttemptState.RUNNING);
app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
app.waitForState(task3Attempt, TaskAttemptState.RUNNING);
// send the done signal to the 1st two maps
app
.getContext()
.getEventHandler()
.handle(
new TaskAttemptEvent(task1Attempt.getID(), TaskAttemptEventType.TA_DONE));
app
.getContext()
.getEventHandler()
.handle(
new TaskAttemptEvent(task2Attempt.getID(), TaskAttemptEventType.TA_DONE));
// wait for first two map task to complete
app.waitForState(mapTask1, TaskState.SUCCEEDED);
app.waitForState(mapTask2, TaskState.SUCCEEDED);
// stop the app
app.stop();
// rerun
// in rerun the 1st two map will be recovered from previous run
app =
new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
++runCount);
conf = new Configuration();
conf.setClass("mapred.output.committer.class", TestFileOutputCommitter.class,
org.apache.hadoop.mapred.OutputCommitter.class);
conf.setBoolean("want.am.recovery", false);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
// Set num-reduces explicitly in conf as recovery logic depends on it.
conf.setInt(MRJobConfig.NUM_REDUCES, 0);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
it = job.getTasks().values().iterator();
mapTask1 = it.next();
mapTask2 = it.next();
mapTask3 = it.next();
// first two maps will NOT be recovered, need to send done from them
app.waitForState(mapTask1, TaskState.RUNNING);
app.waitForState(mapTask2, TaskState.RUNNING);
app.waitForState(mapTask3, TaskState.RUNNING);
task3Attempt = mapTask3.getAttempts().values().iterator().next();
// before sending the TA_DONE, event make sure attempt has come to
// RUNNING state
app.waitForState(task3Attempt, TaskAttemptState.RUNNING);
// send the done signal to all 3 tasks map task
app
.getContext()
.getEventHandler()
.handle(
new TaskAttemptEvent(mapTask1.getAttempts().values().iterator().next()
.getID(), TaskAttemptEventType.TA_DONE));
app
.getContext()
.getEventHandler()
.handle(
new TaskAttemptEvent(mapTask2.getAttempts().values().iterator().next()
.getID(), TaskAttemptEventType.TA_DONE));
app
.getContext()
.getEventHandler()
.handle(
new TaskAttemptEvent(mapTask3.getAttempts().values().iterator().next()
.getID(), TaskAttemptEventType.TA_DONE));
// wait to get it completed
app.waitForState(mapTask3, TaskState.SUCCEEDED);
app.waitForState(job, JobState.SUCCEEDED);
app.verifyCompleted();
}
@Test @Test
public void testMultipleCrashes() throws Exception { public void testMultipleCrashes() throws Exception {

View File

@ -184,10 +184,16 @@ public class FileOutputCommitter extends OutputCommitter {
} }
@Override @Override
@Deprecated
public boolean isRecoverySupported() { public boolean isRecoverySupported() {
return true; return true;
} }
@Override
public boolean isRecoverySupported(JobContext context) throws IOException {
return getWrapped(context).isRecoverySupported(context);
}
@Override @Override
public void recoverTask(TaskAttemptContext context) public void recoverTask(TaskAttemptContext context)
throws IOException { throws IOException {

View File

@ -176,14 +176,34 @@ public abstract class OutputCommitter
/** /**
* This method implements the new interface by calling the old method. Note * This method implements the new interface by calling the old method. Note
* that the input types are different between the new and old apis and this * that the input types are different between the new and old apis and this is
* is a bridge between the two. * a bridge between the two.
*
* @deprecated Use {@link #isRecoverySupported(JobContext)} instead.
*/ */
@Deprecated
@Override @Override
public boolean isRecoverySupported() { public boolean isRecoverySupported() {
return false; return false;
} }
/**
* Is task output recovery supported for restarting jobs?
*
* If task output recovery is supported, job restart can be done more
* efficiently.
*
* @param jobContext
* Context of the job whose output is being written.
* @return <code>true</code> if task output recovery is supported,
* <code>false</code> otherwise
* @throws IOException
* @see #recoverTask(TaskAttemptContext)
*/
public boolean isRecoverySupported(JobContext jobContext) throws IOException {
return isRecoverySupported();
}
/** /**
* Recover the task output. * Recover the task output.
* *
@ -315,4 +335,15 @@ public abstract class OutputCommitter
recoverTask((TaskAttemptContext) taskContext); recoverTask((TaskAttemptContext) taskContext);
} }
/**
* This method implements the new interface by calling the old method. Note
* that the input types are different between the new and old apis and this is
* a bridge between the two.
*/
@Override
public final boolean isRecoverySupported(
org.apache.hadoop.mapreduce.JobContext context) throws IOException {
return isRecoverySupported((JobContext) context);
}
} }

View File

@ -176,17 +176,36 @@ public abstract class OutputCommitter {
/** /**
* Is task output recovery supported for restarting jobs? * Is task output recovery supported for restarting jobs?
* *
* If task output recovery is supported, job restart can be done more * If task output recovery is supported, job restart can be done more
* efficiently. * efficiently.
* *
* @return <code>true</code> if task output recovery is supported, * @return <code>true</code> if task output recovery is supported,
* <code>false</code> otherwise * <code>false</code> otherwise
* @see #recoverTask(TaskAttemptContext) * @see #recoverTask(TaskAttemptContext)
* @deprecated Use {@link #isRecoverySupported(JobContext)} instead.
*/ */
@Deprecated
public boolean isRecoverySupported() { public boolean isRecoverySupported() {
return false; return false;
} }
/**
* Is task output recovery supported for restarting jobs?
*
* If task output recovery is supported, job restart can be done more
* efficiently.
*
* @param jobContext
* Context of the job whose output is being written.
* @return <code>true</code> if task output recovery is supported,
* <code>false</code> otherwise
* @throws IOException
* @see #recoverTask(TaskAttemptContext)
*/
public boolean isRecoverySupported(JobContext jobContext) throws IOException {
return isRecoverySupported();
}
/** /**
* Recover the task output. * Recover the task output.
* *

View File

@ -495,6 +495,7 @@ public class FileOutputCommitter extends OutputCommitter {
} }
@Override @Override
@Deprecated
public boolean isRecoverySupported() { public boolean isRecoverySupported() {
return true; return true;
} }

View File

@ -60,6 +60,7 @@ public class NullOutputFormat<K, V> extends OutputFormat<K, V> {
public void setupTask(TaskAttemptContext taskContext) { } public void setupTask(TaskAttemptContext taskContext) { }
@Override @Override
@Deprecated
public boolean isRecoverySupported() { public boolean isRecoverySupported() {
return true; return true;
} }