diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 5826ca22be6..c994d91a521 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -490,6 +490,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3532. Modified NM to report correct http address when an ephemeral web port is configured. (Bhallamudi Venkata Siva Kamesh via vinodkv) + MAPREDUCE-3404. Corrected MR AM to honor speculative configuration and enable + speculating either maps or reduces. (Eric Payne via vinodkv) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 8ba241ec02d..6097e377d18 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -258,7 +258,7 @@ public void init(final Configuration conf) { dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher()); dispatcher.register(TaskCleaner.EventType.class, taskCleaner); - + if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false) || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) { //optional service to speculate on task attempts' progress @@ -881,9 +881,31 @@ public SpeculatorEventDispatcher(Configuration config) { } @Override public void handle(SpeculatorEvent event) { - if (!disabled && - (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false) - || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false))) { + if (disabled) { + return; + } + + TaskId tId = event.getTaskID(); + TaskType tType = null; + /* event's TaskId will be null if the event type is JOB_CREATE or + * ATTEMPT_STATUS_UPDATE + */ + if (tId != null) { + tType = tId.getTaskType(); + } + boolean shouldMapSpec = + conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false); + boolean shouldReduceSpec = + conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false); + + /* The point of the following is to allow the MAP and REDUCE speculative + * config values to be independent: + * IF spec-exec is turned on for maps AND the task is a map task + * OR IF spec-exec is turned on for reduces AND the task is a reduce task + * THEN call the speculator to handle the event. + */ + if ( (shouldMapSpec && (tType == null || tType == TaskType.MAP)) + || (shouldReduceSpec && (tType == null || tType == TaskType.REDUCE))) { // Speculator IS enabled, direct the event to there. speculator.handle(event); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecution.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecution.java new file mode 100644 index 00000000000..12bb5ac0e79 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecution.java @@ -0,0 +1,309 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2; + +import java.io.File; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobCounter; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator; +import org.apache.hadoop.mapreduce.v2.app.speculate.TaskRuntimeEstimator; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestSpeculativeExecution { + + /* + * This class is used to control when speculative execution happens. + */ + public static class TestSpecEstimator extends LegacyTaskRuntimeEstimator { + private static final long SPECULATE_THIS = 999999L; + + public TestSpecEstimator() { + super(); + } + + /* + * This will only be called if speculative execution is turned on. + * + * If either mapper or reducer speculation is turned on, this will be + * called. + * + * This will cause speculation to engage for the first mapper or first + * reducer (that is, attempt ID "*_m_000000_0" or "*_r_000000_0") + * + * If this attempt is killed, the retry will have attempt id 1, so it + * will not engage speculation again. + */ + @Override + public long estimatedRuntime(TaskAttemptId id) { + if ((id.getTaskId().getId() == 0) && (id.getId() == 0)) { + return SPECULATE_THIS; + } + return super.estimatedRuntime(id); + } + } + + private static final Log LOG = LogFactory.getLog(TestSpeculativeExecution.class); + + protected static MiniMRYarnCluster mrCluster; + + private static Configuration initialConf = new Configuration(); + private static FileSystem localFs; + static { + try { + localFs = FileSystem.getLocal(initialConf); + } catch (IOException io) { + throw new RuntimeException("problem getting local fs", io); + } + } + + private static Path TEST_ROOT_DIR = + new Path("target",TestSpeculativeExecution.class.getName() + "-tmpDir") + .makeQualified(localFs.getUri(), localFs.getWorkingDirectory()); + static Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar"); + private static Path TEST_OUT_DIR = new Path(TEST_ROOT_DIR, "test.out.dir"); + + @BeforeClass + public static void setup() throws IOException { + + if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { + LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + + " not found. Not running test."); + return; + } + + if (mrCluster == null) { + mrCluster = new MiniMRYarnCluster(TestSpeculativeExecution.class.getName(), 4); + Configuration conf = new Configuration(); + mrCluster.init(conf); + mrCluster.start(); + } + + // workaround the absent public distcache. + localFs.copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR); + localFs.setPermission(APP_JAR, new FsPermission("700")); + } + + @AfterClass + public static void tearDown() { + if (mrCluster != null) { + mrCluster.stop(); + mrCluster = null; + } + } + + public static class SpeculativeMapper extends + Mapper { + + public void map(Object key, Text value, Context context) + throws IOException, InterruptedException { + // Make one mapper slower for speculative execution + TaskAttemptID taid = context.getTaskAttemptID(); + long sleepTime = 100; + Configuration conf = context.getConfiguration(); + boolean test_speculate_map = + conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false); + + // IF TESTING MAPPER SPECULATIVE EXECUTION: + // Make the "*_m_000000_0" attempt take much longer than the others. + // When speculative execution is enabled, this should cause the attempt + // to be killed and restarted. At that point, the attempt ID will be + // "*_m_000000_1", so sleepTime will still remain 100ms. + if ( (taid.getTaskType() == TaskType.MAP) && test_speculate_map + && (taid.getTaskID().getId() == 0) && (taid.getId() == 0)) { + sleepTime = 10000; + } + try{ + Thread.sleep(sleepTime); + } catch(InterruptedException ie) { + // Ignore + } + context.write(value, new IntWritable(1)); + } + } + + public static class SpeculativeReducer extends + Reducer { + + public void reduce(Text key, Iterable values, + Context context) throws IOException, InterruptedException { + // Make one reducer slower for speculative execution + TaskAttemptID taid = context.getTaskAttemptID(); + long sleepTime = 100; + Configuration conf = context.getConfiguration(); + boolean test_speculate_reduce = + conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false); + + // IF TESTING REDUCE SPECULATIVE EXECUTION: + // Make the "*_r_000000_0" attempt take much longer than the others. + // When speculative execution is enabled, this should cause the attempt + // to be killed and restarted. At that point, the attempt ID will be + // "*_r_000000_1", so sleepTime will still remain 100ms. + if ( (taid.getTaskType() == TaskType.REDUCE) && test_speculate_reduce + && (taid.getTaskID().getId() == 0) && (taid.getId() == 0)) { + sleepTime = 10000; + } + try{ + Thread.sleep(sleepTime); + } catch(InterruptedException ie) { + // Ignore + } + context.write(key,new IntWritable(0)); + } + } + + @Test + public void testSpeculativeExecution() throws Exception { + if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { + LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + + " not found. Not running test."); + return; + } + + /*------------------------------------------------------------------ + * Test that Map/Red does not speculate if MAP_SPECULATIVE and + * REDUCE_SPECULATIVE are both false. + * ----------------------------------------------------------------- + */ + Job job = runSpecTest(false, false); + + boolean succeeded = job.waitForCompletion(true); + Assert.assertTrue(succeeded); + Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState()); + Counters counters = job.getCounters(); + Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS) + .getValue()); + Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES) + .getValue()); + Assert.assertEquals(0, counters.findCounter(JobCounter.NUM_FAILED_MAPS) + .getValue()); + + /*---------------------------------------------------------------------- + * Test that Mapper speculates if MAP_SPECULATIVE is true and + * REDUCE_SPECULATIVE is false. + * --------------------------------------------------------------------- + */ + job = runSpecTest(true, false); + + succeeded = job.waitForCompletion(true); + Assert.assertTrue(succeeded); + Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState()); + counters = job.getCounters(); + + // The long-running map will be killed and a new one started. + Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS) + .getValue()); + Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES) + .getValue()); + Assert.assertEquals(1, counters.findCounter(JobCounter.NUM_FAILED_MAPS) + .getValue()); + + /*---------------------------------------------------------------------- + * Test that Reducer speculates if REDUCE_SPECULATIVE is true and + * MAP_SPECULATIVE is false. + * --------------------------------------------------------------------- + */ + job = runSpecTest(false, true); + + succeeded = job.waitForCompletion(true); + Assert.assertTrue(succeeded); + Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState()); + counters = job.getCounters(); + + // The long-running map will be killed and a new one started. + Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS) + .getValue()); + Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES) + .getValue()); + } + + private Path createTempFile(String filename, String contents) + throws IOException { + Path path = new Path(TEST_ROOT_DIR, filename); + FSDataOutputStream os = localFs.create(path); + os.writeBytes(contents); + os.close(); + localFs.setPermission(path, new FsPermission("700")); + return path; + } + + private Job runSpecTest(boolean mapspec, boolean redspec) + throws IOException, ClassNotFoundException, InterruptedException { + + Path first = createTempFile("specexec_map_input1", "a\nz"); + Path secnd = createTempFile("specexec_map_input2", "a\nz"); + + Configuration conf = mrCluster.getConfig(); + conf.setBoolean(MRJobConfig.MAP_SPECULATIVE,mapspec); + conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE,redspec); + conf.setClass(MRJobConfig.MR_AM_TASK_ESTIMATOR, + TestSpecEstimator.class, + TaskRuntimeEstimator.class); + + Job job = Job.getInstance(conf); + job.setJarByClass(TestSpeculativeExecution.class); + job.setMapperClass(SpeculativeMapper.class); + job.setReducerClass(SpeculativeReducer.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + job.setNumReduceTasks(2); + FileInputFormat.setInputPaths(job, first); + FileInputFormat.addInputPath(job, secnd); + FileOutputFormat.setOutputPath(job, TEST_OUT_DIR); + + // Delete output directory if it exists. + try { + localFs.delete(TEST_OUT_DIR,true); + } catch (IOException e) { + // ignore + } + + // Creates the Job Configuration + job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. + job.createSymlink(); + job.setMaxMapAttempts(2); + + job.submit(); + + return job; + } +}