From a3a7efe74849add32263607b16cc590af87fab20 Mon Sep 17 00:00:00 2001 From: Robert Kanter Date: Mon, 2 Feb 2015 13:43:05 -0800 Subject: [PATCH] MAPREDUCE-6143. add configuration for mapreduce speculative execution in MR2 (zxu via rkanter) (cherry picked from commit 8acc5e9b4b3fea8b418b3526c15022c8a9fedd56) --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../v2/app/speculate/DefaultSpeculator.java | 62 +++++++++++++++---- .../v2/app/TestRuntimeEstimators.java | 16 +++++ .../apache/hadoop/mapreduce/MRJobConfig.java | 29 +++++++++ .../hadoop/mapreduce/util/ConfigUtil.java | 4 +- .../src/main/resources/mapred-default.xml | 41 ++++++++---- 6 files changed, 130 insertions(+), 25 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 4ef7407a0f6..3ba94bbcf98 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -36,6 +36,9 @@ Release 2.7.0 - UNRELEASED MAPREDUCE-6151. Update document of GridMix (Masatake Iwasaki via aw) + MAPREDUCE-6143. add configuration for mapreduce speculative execution in + MR2 (zxu via rkanter) + OPTIMIZATIONS MAPREDUCE-6169. MergeQueue should release reference to the current item diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java index 392a51aebd6..07a49af375f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.util.Clock; +import com.google.common.annotations.VisibleForTesting; public class DefaultSpeculator extends AbstractService implements Speculator { @@ -62,12 +63,11 @@ public class DefaultSpeculator extends AbstractService implements private static final long NOT_RUNNING = Long.MIN_VALUE + 4; private static final long TOO_LATE_TO_SPECULATE = Long.MIN_VALUE + 5; - private static final long SOONEST_RETRY_AFTER_NO_SPECULATE = 1000L * 1L; - private static final long SOONEST_RETRY_AFTER_SPECULATE = 1000L * 15L; - - private static final double PROPORTION_RUNNING_TASKS_SPECULATABLE = 0.1; - private static final double PROPORTION_TOTAL_TASKS_SPECULATABLE = 0.01; - private static final int MINIMUM_ALLOWED_SPECULATIVE_TASKS = 10; + private long soonestRetryAfterNoSpeculate; + private long soonestRetryAfterSpeculate; + private double proportionRunningTasksSpeculatable; + private double proportionTotalTasksSpeculatable; + private int minimumAllowedSpeculativeTasks; private static final Log LOG = LogFactory.getLog(DefaultSpeculator.class); @@ -163,6 +163,21 @@ public class DefaultSpeculator extends AbstractService implements this.estimator = estimator; this.clock = clock; this.eventHandler = context.getEventHandler(); + this.soonestRetryAfterNoSpeculate = + conf.getLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_NO_SPECULATE, + MRJobConfig.DEFAULT_SPECULATIVE_RETRY_AFTER_NO_SPECULATE); + this.soonestRetryAfterSpeculate = + conf.getLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_SPECULATE, + MRJobConfig.DEFAULT_SPECULATIVE_RETRY_AFTER_SPECULATE); + this.proportionRunningTasksSpeculatable = + conf.getDouble(MRJobConfig.SPECULATIVECAP_RUNNING_TASKS, + MRJobConfig.DEFAULT_SPECULATIVECAP_RUNNING_TASKS); + this.proportionTotalTasksSpeculatable = + conf.getDouble(MRJobConfig.SPECULATIVECAP_TOTAL_TASKS, + MRJobConfig.DEFAULT_SPECULATIVECAP_TOTAL_TASKS); + this.minimumAllowedSpeculativeTasks = + conf.getInt(MRJobConfig.SPECULATIVE_MINIMUM_ALLOWED_TASKS, + MRJobConfig.DEFAULT_SPECULATIVE_MINIMUM_ALLOWED_TASKS); } /* ************************************************************* */ @@ -182,8 +197,8 @@ public class DefaultSpeculator extends AbstractService implements try { int speculations = computeSpeculations(); long mininumRecomp - = speculations > 0 ? SOONEST_RETRY_AFTER_SPECULATE - : SOONEST_RETRY_AFTER_NO_SPECULATE; + = speculations > 0 ? soonestRetryAfterSpeculate + : soonestRetryAfterNoSpeculate; long wait = Math.max(mininumRecomp, clock.getTime() - backgroundRunStartTime); @@ -497,8 +512,8 @@ public class DefaultSpeculator extends AbstractService implements Map tasks = job.getTasks(type); int numberAllowedSpeculativeTasks - = (int) Math.max(MINIMUM_ALLOWED_SPECULATIVE_TASKS, - PROPORTION_TOTAL_TASKS_SPECULATABLE * tasks.size()); + = (int) Math.max(minimumAllowedSpeculativeTasks, + proportionTotalTasksSpeculatable * tasks.size()); TaskId bestTaskID = null; long bestSpeculationValue = -1L; @@ -523,7 +538,7 @@ public class DefaultSpeculator extends AbstractService implements } numberAllowedSpeculativeTasks = (int) Math.max(numberAllowedSpeculativeTasks, - PROPORTION_RUNNING_TASKS_SPECULATABLE * numberRunningTasks); + proportionRunningTasksSpeculatable * numberRunningTasks); // If we found a speculation target, fire it off if (bestTaskID != null @@ -583,4 +598,29 @@ public class DefaultSpeculator extends AbstractService implements this.lastHeartBeatTime = lastHeartBeatTime; } } + + @VisibleForTesting + public long getSoonestRetryAfterNoSpeculate() { + return soonestRetryAfterNoSpeculate; + } + + @VisibleForTesting + public long getSoonestRetryAfterSpeculate() { + return soonestRetryAfterSpeculate; + } + + @VisibleForTesting + public double getProportionRunningTasksSpeculatable() { + return proportionRunningTasksSpeculatable; + } + + @VisibleForTesting + public double getProportionTotalTasksSpeculatable() { + return proportionTotalTasksSpeculatable; + } + + @VisibleForTesting + public int getMinimumAllowedSpeculativeTasks() { + return minimumAllowedSpeculativeTasks; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java index c25cf5060e9..fe0f34152be 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java @@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.TaskCompletionEvent; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.JobACL; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; @@ -137,7 +138,22 @@ public class TestRuntimeEstimators { estimator.contextualize(conf, myAppContext); + conf.setLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_NO_SPECULATE, 500L); + conf.setLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_SPECULATE, 5000L); + conf.setDouble(MRJobConfig.SPECULATIVECAP_RUNNING_TASKS, 0.1); + conf.setDouble(MRJobConfig.SPECULATIVECAP_TOTAL_TASKS, 0.001); + conf.setInt(MRJobConfig.SPECULATIVE_MINIMUM_ALLOWED_TASKS, 5); speculator = new DefaultSpeculator(conf, myAppContext, estimator, clock); + Assert.assertEquals("wrong SPECULATIVE_RETRY_AFTER_NO_SPECULATE value", + 500L, speculator.getSoonestRetryAfterNoSpeculate()); + Assert.assertEquals("wrong SPECULATIVE_RETRY_AFTER_SPECULATE value", + 5000L, speculator.getSoonestRetryAfterSpeculate()); + Assert.assertEquals(speculator.getProportionRunningTasksSpeculatable(), + 0.1, 0.00001); + Assert.assertEquals(speculator.getProportionTotalTasksSpeculatable(), + 0.001, 0.00001); + Assert.assertEquals("wrong SPECULATIVE_MINIMUM_ALLOWED_TASKS value", + 5, speculator.getMinimumAllowedSpeculativeTasks()); dispatcher.register(Speculator.EventType.class, speculator); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 41e3fa4e594..1886d0c9006 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -86,12 +86,41 @@ public interface MRJobConfig { public static final String SKIP_OUTDIR = "mapreduce.job.skip.outdir"; + // SPECULATIVE_SLOWNODE_THRESHOLD is obsolete and will be deleted in the future + @Deprecated public static final String SPECULATIVE_SLOWNODE_THRESHOLD = "mapreduce.job.speculative.slownodethreshold"; public static final String SPECULATIVE_SLOWTASK_THRESHOLD = "mapreduce.job.speculative.slowtaskthreshold"; + // SPECULATIVECAP is obsolete and will be deleted in the future + @Deprecated public static final String SPECULATIVECAP = "mapreduce.job.speculative.speculativecap"; + public static final String SPECULATIVECAP_RUNNING_TASKS = + "mapreduce.job.speculative.speculative-cap-running-tasks"; + public static final double DEFAULT_SPECULATIVECAP_RUNNING_TASKS = + 0.1; + + public static final String SPECULATIVECAP_TOTAL_TASKS = + "mapreduce.job.speculative.speculative-cap-total-tasks"; + public static final double DEFAULT_SPECULATIVECAP_TOTAL_TASKS = + 0.01; + + public static final String SPECULATIVE_MINIMUM_ALLOWED_TASKS = + "mapreduce.job.speculative.minimum-allowed-tasks"; + public static final int DEFAULT_SPECULATIVE_MINIMUM_ALLOWED_TASKS = + 10; + + public static final String SPECULATIVE_RETRY_AFTER_NO_SPECULATE = + "mapreduce.job.speculative.retry-after-no-speculate"; + public static final long DEFAULT_SPECULATIVE_RETRY_AFTER_NO_SPECULATE = + 1000L; + + public static final String SPECULATIVE_RETRY_AFTER_SPECULATE = + "mapreduce.job.speculative.retry-after-speculate"; + public static final long DEFAULT_SPECULATIVE_RETRY_AFTER_SPECULATE = + 15000L; + public static final String JOB_LOCAL_DIR = "mapreduce.job.local.dir"; public static final String OUTPUT_KEY_CLASS = "mapreduce.job.output.key.class"; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java index b1756cee8d7..bbbf99c4beb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java @@ -214,12 +214,10 @@ public class ConfigUtil { MRJobConfig.SKIP_RECORDS), new DeprecationDelta("mapred.skip.out.dir", MRJobConfig.SKIP_OUTDIR), - new DeprecationDelta("mapred.speculative.execution.slowNodeThreshold", - MRJobConfig.SPECULATIVE_SLOWNODE_THRESHOLD), new DeprecationDelta("mapred.speculative.execution.slowTaskThreshold", MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD), new DeprecationDelta("mapred.speculative.execution.speculativeCap", - MRJobConfig.SPECULATIVECAP), + MRJobConfig.SPECULATIVECAP_RUNNING_TASKS), new DeprecationDelta("job.local.dir", MRJobConfig.JOB_LOCAL_DIR), new DeprecationDelta("mapreduce.inputformat.class", diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index a14d7ed42de..8700ecbb6fd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -736,13 +736,42 @@ If true, then multiple instances of some reduce tasks may be executed in parallel. + - mapreduce.job.speculative.speculativecap + mapreduce.job.speculative.speculative-cap-running-tasks 0.1 The max percent (0-1) of running tasks that can be speculatively re-executed at any time. + + mapreduce.job.speculative.speculative-cap-total-tasks + 0.01 + The max percent (0-1) of all tasks that + can be speculatively re-executed at any time. + + + + mapreduce.job.speculative.minimum-allowed-tasks + 10 + The minimum allowed tasks that + can be speculatively re-executed at any time. + + + + mapreduce.job.speculative.retry-after-no-speculate + 1000 + The waiting time(ms) to do next round of speculation + if there is no task speculated in this round. + + + + mapreduce.job.speculative.retry-after-speculate + 15000 + The waiting time(ms) to do next round of speculation + if there are tasks speculated in this round. + + mapreduce.job.map.output.collector.class org.apache.hadoop.mapred.MapTask$MapOutputBuffer @@ -762,16 +791,6 @@ - - mapreduce.job.speculative.slownodethreshold - 1.0 - The number of standard deviations by which a Task - Tracker's ave map and reduce progress-rates (finishTime-dispatchTime) - must be lower than the average of all successful map/reduce task's for - the TT to be considered too slow to give a speculative task to. - - - mapreduce.job.jvm.numtasks 1