MAPREDUCE-7278. Speculative execution behavior is observed even when mapreduce.map.speculative and mapreduce.reduce.speculative are false
Contributed by Tarun Parimi.
(cherry picked from commit 10db97df1c
)
This commit is contained in:
parent
d6de530bbb
commit
e2713d5788
|
@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.mapred.JobConf;
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
import org.apache.hadoop.mapreduce.Counters;
|
import org.apache.hadoop.mapreduce.Counters;
|
||||||
import org.apache.hadoop.mapreduce.MRConfig;
|
import org.apache.hadoop.mapreduce.MRConfig;
|
||||||
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.OutputCommitter;
|
import org.apache.hadoop.mapreduce.OutputCommitter;
|
||||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
||||||
|
@ -142,7 +143,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
||||||
private boolean historyTaskStartGenerated = false;
|
private boolean historyTaskStartGenerated = false;
|
||||||
// Launch time reported in history events.
|
// Launch time reported in history events.
|
||||||
private long launchTime;
|
private long launchTime;
|
||||||
|
private boolean speculationEnabled = false;
|
||||||
|
|
||||||
private static final SingleArcTransition<TaskImpl, TaskEvent>
|
private static final SingleArcTransition<TaskImpl, TaskEvent>
|
||||||
ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
|
ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
|
||||||
private static final SingleArcTransition<TaskImpl, TaskEvent>
|
private static final SingleArcTransition<TaskImpl, TaskEvent>
|
||||||
|
@ -325,6 +327,9 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
||||||
this.appContext = appContext;
|
this.appContext = appContext;
|
||||||
this.encryptedShuffle = conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
|
this.encryptedShuffle = conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
|
||||||
MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT);
|
MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT);
|
||||||
|
this.speculationEnabled = taskType.equals(TaskType.MAP) ?
|
||||||
|
conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false) :
|
||||||
|
conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
|
||||||
|
|
||||||
// This "this leak" is okay because the retained pointer is in an
|
// This "this leak" is okay because the retained pointer is in an
|
||||||
// instance variable.
|
// instance variable.
|
||||||
|
@ -1079,13 +1084,19 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
||||||
if (task.successfulAttempt == null) {
|
if (task.successfulAttempt == null) {
|
||||||
boolean shouldAddNewAttempt = true;
|
boolean shouldAddNewAttempt = true;
|
||||||
if (task.inProgressAttempts.size() > 0) {
|
if (task.inProgressAttempts.size() > 0) {
|
||||||
// if not all of the inProgressAttempts are hanging for resource
|
if(task.speculationEnabled) {
|
||||||
for (TaskAttemptId attemptId : task.inProgressAttempts) {
|
// if not all of the inProgressAttempts are hanging for resource
|
||||||
if (((TaskAttemptImpl) task.getAttempt(attemptId))
|
for (TaskAttemptId attemptId : task.inProgressAttempts) {
|
||||||
.isContainerAssigned()) {
|
if (((TaskAttemptImpl) task.getAttempt(attemptId))
|
||||||
shouldAddNewAttempt = false;
|
.isContainerAssigned()) {
|
||||||
break;
|
shouldAddNewAttempt = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// No need to add new attempt if there are in progress attempts
|
||||||
|
// when speculation is false
|
||||||
|
shouldAddNewAttempt = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (shouldAddNewAttempt) {
|
if (shouldAddNewAttempt) {
|
||||||
|
|
|
@ -192,6 +192,46 @@ public class TestSpeculativeExecution {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class FailOnceMapper extends
|
||||||
|
Mapper<Object, Text, Text, IntWritable> {
|
||||||
|
|
||||||
|
public void map(Object key, Text value, Context context)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
TaskAttemptID taid = context.getTaskAttemptID();
|
||||||
|
try{
|
||||||
|
Thread.sleep(2000);
|
||||||
|
} catch(InterruptedException ie) {
|
||||||
|
// Ignore
|
||||||
|
}
|
||||||
|
// Fail mapper only for first attempt
|
||||||
|
if (taid.getId() == 0) {
|
||||||
|
throw new RuntimeException("Failing this mapper");
|
||||||
|
}
|
||||||
|
|
||||||
|
context.write(value, new IntWritable(1));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class FailOnceReducer extends
|
||||||
|
Reducer<Text, IntWritable, Text, IntWritable> {
|
||||||
|
|
||||||
|
public void reduce(Text key, Iterable<IntWritable> values,
|
||||||
|
Context context) throws IOException, InterruptedException {
|
||||||
|
TaskAttemptID taid = context.getTaskAttemptID();
|
||||||
|
try{
|
||||||
|
Thread.sleep(2000);
|
||||||
|
} catch(InterruptedException ie) {
|
||||||
|
// Ignore
|
||||||
|
}
|
||||||
|
// Fail reduce only for first attempt
|
||||||
|
if (taid.getId() == 0) {
|
||||||
|
throw new RuntimeException("Failing this reducer");
|
||||||
|
}
|
||||||
|
context.write(key, new IntWritable(0));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSpeculativeExecution() throws Exception {
|
public void testSpeculativeExecution() throws Exception {
|
||||||
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
|
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
|
||||||
|
@ -218,6 +258,30 @@ public class TestSpeculativeExecution {
|
||||||
Assert.assertEquals(0, counters.findCounter(JobCounter.NUM_FAILED_MAPS)
|
Assert.assertEquals(0, counters.findCounter(JobCounter.NUM_FAILED_MAPS)
|
||||||
.getValue());
|
.getValue());
|
||||||
|
|
||||||
|
|
||||||
|
/*------------------------------------------------------------------
|
||||||
|
* Test that Map/Red does not speculate if MAP_SPECULATIVE and
|
||||||
|
* REDUCE_SPECULATIVE are both false. When map tasks fail once and time out,
|
||||||
|
* we shouldn't launch two simultaneous attempts. MAPREDUCE-7278
|
||||||
|
* -----------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
job = runNonSpecFailOnceTest();
|
||||||
|
|
||||||
|
succeeded = job.waitForCompletion(true);
|
||||||
|
Assert.assertTrue(succeeded);
|
||||||
|
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
|
||||||
|
counters = job.getCounters();
|
||||||
|
// We will have 4 total since 2 map tasks fail and relaunch attempt once
|
||||||
|
Assert.assertEquals(4,
|
||||||
|
counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS).getValue());
|
||||||
|
Assert.assertEquals(4,
|
||||||
|
counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES).getValue());
|
||||||
|
// Ensure no maps or reduces killed due to accidental speculation
|
||||||
|
Assert.assertEquals(0,
|
||||||
|
counters.findCounter(JobCounter.NUM_KILLED_MAPS).getValue());
|
||||||
|
Assert.assertEquals(0,
|
||||||
|
counters.findCounter(JobCounter.NUM_KILLED_REDUCES).getValue());
|
||||||
|
|
||||||
/*----------------------------------------------------------------------
|
/*----------------------------------------------------------------------
|
||||||
* Test that Mapper speculates if MAP_SPECULATIVE is true and
|
* Test that Mapper speculates if MAP_SPECULATIVE is true and
|
||||||
* REDUCE_SPECULATIVE is false.
|
* REDUCE_SPECULATIVE is false.
|
||||||
|
@ -295,7 +359,48 @@ public class TestSpeculativeExecution {
|
||||||
|
|
||||||
// Delete output directory if it exists.
|
// Delete output directory if it exists.
|
||||||
try {
|
try {
|
||||||
localFs.delete(TEST_OUT_DIR,true);
|
localFs.delete(TEST_OUT_DIR, true);
|
||||||
|
} catch (IOException e) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
|
||||||
|
// Creates the Job Configuration
|
||||||
|
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
|
||||||
|
job.setMaxMapAttempts(2);
|
||||||
|
|
||||||
|
job.submit();
|
||||||
|
|
||||||
|
return job;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Job runNonSpecFailOnceTest()
|
||||||
|
throws IOException, ClassNotFoundException, InterruptedException {
|
||||||
|
|
||||||
|
Path first = createTempFile("specexec_map_input1", "a\nz");
|
||||||
|
Path secnd = createTempFile("specexec_map_input2", "a\nz");
|
||||||
|
|
||||||
|
Configuration conf = mrCluster.getConfig();
|
||||||
|
conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
|
||||||
|
conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
|
||||||
|
// Prevent blacklisting since tasks fail once
|
||||||
|
conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, false);
|
||||||
|
// Setting small task exit timeout values reproduces MAPREDUCE-7278
|
||||||
|
conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT, 20);
|
||||||
|
conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS, 10);
|
||||||
|
Job job = Job.getInstance(conf);
|
||||||
|
job.setJarByClass(TestSpeculativeExecution.class);
|
||||||
|
job.setMapperClass(FailOnceMapper.class);
|
||||||
|
job.setReducerClass(FailOnceReducer.class);
|
||||||
|
job.setOutputKeyClass(Text.class);
|
||||||
|
job.setOutputValueClass(IntWritable.class);
|
||||||
|
job.setNumReduceTasks(2);
|
||||||
|
FileInputFormat.setInputPaths(job, first);
|
||||||
|
FileInputFormat.addInputPath(job, secnd);
|
||||||
|
FileOutputFormat.setOutputPath(job, TEST_OUT_DIR);
|
||||||
|
|
||||||
|
// Delete output directory if it exists.
|
||||||
|
try {
|
||||||
|
localFs.delete(TEST_OUT_DIR, true);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// ignore
|
// ignore
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue