MAPREDUCE-6143. add configuration for mapreduce speculative execution in MR2 (zxu via rkanter)
(cherry picked from commit 8acc5e9b4b
)
This commit is contained in:
parent
8426c7d806
commit
a3a7efe748
|
@ -36,6 +36,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
|
|
||||||
MAPREDUCE-6151. Update document of GridMix (Masatake Iwasaki via aw)
|
MAPREDUCE-6151. Update document of GridMix (Masatake Iwasaki via aw)
|
||||||
|
|
||||||
|
MAPREDUCE-6143. add configuration for mapreduce speculative execution in
|
||||||
|
MR2 (zxu via rkanter)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
MAPREDUCE-6169. MergeQueue should release reference to the current item
|
MAPREDUCE-6169. MergeQueue should release reference to the current item
|
||||||
|
|
|
@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.util.Clock;
|
import org.apache.hadoop.yarn.util.Clock;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
public class DefaultSpeculator extends AbstractService implements
|
public class DefaultSpeculator extends AbstractService implements
|
||||||
Speculator {
|
Speculator {
|
||||||
|
@ -62,12 +63,11 @@ public class DefaultSpeculator extends AbstractService implements
|
||||||
private static final long NOT_RUNNING = Long.MIN_VALUE + 4;
|
private static final long NOT_RUNNING = Long.MIN_VALUE + 4;
|
||||||
private static final long TOO_LATE_TO_SPECULATE = Long.MIN_VALUE + 5;
|
private static final long TOO_LATE_TO_SPECULATE = Long.MIN_VALUE + 5;
|
||||||
|
|
||||||
private static final long SOONEST_RETRY_AFTER_NO_SPECULATE = 1000L * 1L;
|
private long soonestRetryAfterNoSpeculate;
|
||||||
private static final long SOONEST_RETRY_AFTER_SPECULATE = 1000L * 15L;
|
private long soonestRetryAfterSpeculate;
|
||||||
|
private double proportionRunningTasksSpeculatable;
|
||||||
private static final double PROPORTION_RUNNING_TASKS_SPECULATABLE = 0.1;
|
private double proportionTotalTasksSpeculatable;
|
||||||
private static final double PROPORTION_TOTAL_TASKS_SPECULATABLE = 0.01;
|
private int minimumAllowedSpeculativeTasks;
|
||||||
private static final int MINIMUM_ALLOWED_SPECULATIVE_TASKS = 10;
|
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(DefaultSpeculator.class);
|
private static final Log LOG = LogFactory.getLog(DefaultSpeculator.class);
|
||||||
|
|
||||||
|
@ -163,6 +163,21 @@ public class DefaultSpeculator extends AbstractService implements
|
||||||
this.estimator = estimator;
|
this.estimator = estimator;
|
||||||
this.clock = clock;
|
this.clock = clock;
|
||||||
this.eventHandler = context.getEventHandler();
|
this.eventHandler = context.getEventHandler();
|
||||||
|
this.soonestRetryAfterNoSpeculate =
|
||||||
|
conf.getLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_NO_SPECULATE,
|
||||||
|
MRJobConfig.DEFAULT_SPECULATIVE_RETRY_AFTER_NO_SPECULATE);
|
||||||
|
this.soonestRetryAfterSpeculate =
|
||||||
|
conf.getLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_SPECULATE,
|
||||||
|
MRJobConfig.DEFAULT_SPECULATIVE_RETRY_AFTER_SPECULATE);
|
||||||
|
this.proportionRunningTasksSpeculatable =
|
||||||
|
conf.getDouble(MRJobConfig.SPECULATIVECAP_RUNNING_TASKS,
|
||||||
|
MRJobConfig.DEFAULT_SPECULATIVECAP_RUNNING_TASKS);
|
||||||
|
this.proportionTotalTasksSpeculatable =
|
||||||
|
conf.getDouble(MRJobConfig.SPECULATIVECAP_TOTAL_TASKS,
|
||||||
|
MRJobConfig.DEFAULT_SPECULATIVECAP_TOTAL_TASKS);
|
||||||
|
this.minimumAllowedSpeculativeTasks =
|
||||||
|
conf.getInt(MRJobConfig.SPECULATIVE_MINIMUM_ALLOWED_TASKS,
|
||||||
|
MRJobConfig.DEFAULT_SPECULATIVE_MINIMUM_ALLOWED_TASKS);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ************************************************************* */
|
/* ************************************************************* */
|
||||||
|
@ -182,8 +197,8 @@ public class DefaultSpeculator extends AbstractService implements
|
||||||
try {
|
try {
|
||||||
int speculations = computeSpeculations();
|
int speculations = computeSpeculations();
|
||||||
long mininumRecomp
|
long mininumRecomp
|
||||||
= speculations > 0 ? SOONEST_RETRY_AFTER_SPECULATE
|
= speculations > 0 ? soonestRetryAfterSpeculate
|
||||||
: SOONEST_RETRY_AFTER_NO_SPECULATE;
|
: soonestRetryAfterNoSpeculate;
|
||||||
|
|
||||||
long wait = Math.max(mininumRecomp,
|
long wait = Math.max(mininumRecomp,
|
||||||
clock.getTime() - backgroundRunStartTime);
|
clock.getTime() - backgroundRunStartTime);
|
||||||
|
@ -497,8 +512,8 @@ public class DefaultSpeculator extends AbstractService implements
|
||||||
Map<TaskId, Task> tasks = job.getTasks(type);
|
Map<TaskId, Task> tasks = job.getTasks(type);
|
||||||
|
|
||||||
int numberAllowedSpeculativeTasks
|
int numberAllowedSpeculativeTasks
|
||||||
= (int) Math.max(MINIMUM_ALLOWED_SPECULATIVE_TASKS,
|
= (int) Math.max(minimumAllowedSpeculativeTasks,
|
||||||
PROPORTION_TOTAL_TASKS_SPECULATABLE * tasks.size());
|
proportionTotalTasksSpeculatable * tasks.size());
|
||||||
|
|
||||||
TaskId bestTaskID = null;
|
TaskId bestTaskID = null;
|
||||||
long bestSpeculationValue = -1L;
|
long bestSpeculationValue = -1L;
|
||||||
|
@ -523,7 +538,7 @@ public class DefaultSpeculator extends AbstractService implements
|
||||||
}
|
}
|
||||||
numberAllowedSpeculativeTasks
|
numberAllowedSpeculativeTasks
|
||||||
= (int) Math.max(numberAllowedSpeculativeTasks,
|
= (int) Math.max(numberAllowedSpeculativeTasks,
|
||||||
PROPORTION_RUNNING_TASKS_SPECULATABLE * numberRunningTasks);
|
proportionRunningTasksSpeculatable * numberRunningTasks);
|
||||||
|
|
||||||
// If we found a speculation target, fire it off
|
// If we found a speculation target, fire it off
|
||||||
if (bestTaskID != null
|
if (bestTaskID != null
|
||||||
|
@ -583,4 +598,29 @@ public class DefaultSpeculator extends AbstractService implements
|
||||||
this.lastHeartBeatTime = lastHeartBeatTime;
|
this.lastHeartBeatTime = lastHeartBeatTime;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getSoonestRetryAfterNoSpeculate() {
|
||||||
|
return soonestRetryAfterNoSpeculate;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getSoonestRetryAfterSpeculate() {
|
||||||
|
return soonestRetryAfterSpeculate;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public double getProportionRunningTasksSpeculatable() {
|
||||||
|
return proportionRunningTasksSpeculatable;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public double getProportionTotalTasksSpeculatable() {
|
||||||
|
return proportionTotalTasksSpeculatable;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public int getMinimumAllowedSpeculativeTasks() {
|
||||||
|
return minimumAllowedSpeculativeTasks;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.mapred.TaskCompletionEvent;
|
import org.apache.hadoop.mapred.TaskCompletionEvent;
|
||||||
import org.apache.hadoop.mapreduce.Counters;
|
import org.apache.hadoop.mapreduce.Counters;
|
||||||
import org.apache.hadoop.mapreduce.JobACL;
|
import org.apache.hadoop.mapreduce.JobACL;
|
||||||
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
|
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
|
||||||
|
@ -137,7 +138,22 @@ public class TestRuntimeEstimators {
|
||||||
|
|
||||||
estimator.contextualize(conf, myAppContext);
|
estimator.contextualize(conf, myAppContext);
|
||||||
|
|
||||||
|
conf.setLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_NO_SPECULATE, 500L);
|
||||||
|
conf.setLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_SPECULATE, 5000L);
|
||||||
|
conf.setDouble(MRJobConfig.SPECULATIVECAP_RUNNING_TASKS, 0.1);
|
||||||
|
conf.setDouble(MRJobConfig.SPECULATIVECAP_TOTAL_TASKS, 0.001);
|
||||||
|
conf.setInt(MRJobConfig.SPECULATIVE_MINIMUM_ALLOWED_TASKS, 5);
|
||||||
speculator = new DefaultSpeculator(conf, myAppContext, estimator, clock);
|
speculator = new DefaultSpeculator(conf, myAppContext, estimator, clock);
|
||||||
|
Assert.assertEquals("wrong SPECULATIVE_RETRY_AFTER_NO_SPECULATE value",
|
||||||
|
500L, speculator.getSoonestRetryAfterNoSpeculate());
|
||||||
|
Assert.assertEquals("wrong SPECULATIVE_RETRY_AFTER_SPECULATE value",
|
||||||
|
5000L, speculator.getSoonestRetryAfterSpeculate());
|
||||||
|
Assert.assertEquals(speculator.getProportionRunningTasksSpeculatable(),
|
||||||
|
0.1, 0.00001);
|
||||||
|
Assert.assertEquals(speculator.getProportionTotalTasksSpeculatable(),
|
||||||
|
0.001, 0.00001);
|
||||||
|
Assert.assertEquals("wrong SPECULATIVE_MINIMUM_ALLOWED_TASKS value",
|
||||||
|
5, speculator.getMinimumAllowedSpeculativeTasks());
|
||||||
|
|
||||||
dispatcher.register(Speculator.EventType.class, speculator);
|
dispatcher.register(Speculator.EventType.class, speculator);
|
||||||
|
|
||||||
|
|
|
@ -86,12 +86,41 @@ public interface MRJobConfig {
|
||||||
|
|
||||||
public static final String SKIP_OUTDIR = "mapreduce.job.skip.outdir";
|
public static final String SKIP_OUTDIR = "mapreduce.job.skip.outdir";
|
||||||
|
|
||||||
|
// SPECULATIVE_SLOWNODE_THRESHOLD is obsolete and will be deleted in the future
|
||||||
|
@Deprecated
|
||||||
public static final String SPECULATIVE_SLOWNODE_THRESHOLD = "mapreduce.job.speculative.slownodethreshold";
|
public static final String SPECULATIVE_SLOWNODE_THRESHOLD = "mapreduce.job.speculative.slownodethreshold";
|
||||||
|
|
||||||
public static final String SPECULATIVE_SLOWTASK_THRESHOLD = "mapreduce.job.speculative.slowtaskthreshold";
|
public static final String SPECULATIVE_SLOWTASK_THRESHOLD = "mapreduce.job.speculative.slowtaskthreshold";
|
||||||
|
|
||||||
|
// SPECULATIVECAP is obsolete and will be deleted in the future
|
||||||
|
@Deprecated
|
||||||
public static final String SPECULATIVECAP = "mapreduce.job.speculative.speculativecap";
|
public static final String SPECULATIVECAP = "mapreduce.job.speculative.speculativecap";
|
||||||
|
|
||||||
|
public static final String SPECULATIVECAP_RUNNING_TASKS =
|
||||||
|
"mapreduce.job.speculative.speculative-cap-running-tasks";
|
||||||
|
public static final double DEFAULT_SPECULATIVECAP_RUNNING_TASKS =
|
||||||
|
0.1;
|
||||||
|
|
||||||
|
public static final String SPECULATIVECAP_TOTAL_TASKS =
|
||||||
|
"mapreduce.job.speculative.speculative-cap-total-tasks";
|
||||||
|
public static final double DEFAULT_SPECULATIVECAP_TOTAL_TASKS =
|
||||||
|
0.01;
|
||||||
|
|
||||||
|
public static final String SPECULATIVE_MINIMUM_ALLOWED_TASKS =
|
||||||
|
"mapreduce.job.speculative.minimum-allowed-tasks";
|
||||||
|
public static final int DEFAULT_SPECULATIVE_MINIMUM_ALLOWED_TASKS =
|
||||||
|
10;
|
||||||
|
|
||||||
|
public static final String SPECULATIVE_RETRY_AFTER_NO_SPECULATE =
|
||||||
|
"mapreduce.job.speculative.retry-after-no-speculate";
|
||||||
|
public static final long DEFAULT_SPECULATIVE_RETRY_AFTER_NO_SPECULATE =
|
||||||
|
1000L;
|
||||||
|
|
||||||
|
public static final String SPECULATIVE_RETRY_AFTER_SPECULATE =
|
||||||
|
"mapreduce.job.speculative.retry-after-speculate";
|
||||||
|
public static final long DEFAULT_SPECULATIVE_RETRY_AFTER_SPECULATE =
|
||||||
|
15000L;
|
||||||
|
|
||||||
public static final String JOB_LOCAL_DIR = "mapreduce.job.local.dir";
|
public static final String JOB_LOCAL_DIR = "mapreduce.job.local.dir";
|
||||||
|
|
||||||
public static final String OUTPUT_KEY_CLASS = "mapreduce.job.output.key.class";
|
public static final String OUTPUT_KEY_CLASS = "mapreduce.job.output.key.class";
|
||||||
|
|
|
@ -214,12 +214,10 @@ public class ConfigUtil {
|
||||||
MRJobConfig.SKIP_RECORDS),
|
MRJobConfig.SKIP_RECORDS),
|
||||||
new DeprecationDelta("mapred.skip.out.dir",
|
new DeprecationDelta("mapred.skip.out.dir",
|
||||||
MRJobConfig.SKIP_OUTDIR),
|
MRJobConfig.SKIP_OUTDIR),
|
||||||
new DeprecationDelta("mapred.speculative.execution.slowNodeThreshold",
|
|
||||||
MRJobConfig.SPECULATIVE_SLOWNODE_THRESHOLD),
|
|
||||||
new DeprecationDelta("mapred.speculative.execution.slowTaskThreshold",
|
new DeprecationDelta("mapred.speculative.execution.slowTaskThreshold",
|
||||||
MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD),
|
MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD),
|
||||||
new DeprecationDelta("mapred.speculative.execution.speculativeCap",
|
new DeprecationDelta("mapred.speculative.execution.speculativeCap",
|
||||||
MRJobConfig.SPECULATIVECAP),
|
MRJobConfig.SPECULATIVECAP_RUNNING_TASKS),
|
||||||
new DeprecationDelta("job.local.dir",
|
new DeprecationDelta("job.local.dir",
|
||||||
MRJobConfig.JOB_LOCAL_DIR),
|
MRJobConfig.JOB_LOCAL_DIR),
|
||||||
new DeprecationDelta("mapreduce.inputformat.class",
|
new DeprecationDelta("mapreduce.inputformat.class",
|
||||||
|
|
|
@ -736,13 +736,42 @@
|
||||||
<description>If true, then multiple instances of some reduce tasks
|
<description>If true, then multiple instances of some reduce tasks
|
||||||
may be executed in parallel.</description>
|
may be executed in parallel.</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>mapreduce.job.speculative.speculativecap</name>
|
<name>mapreduce.job.speculative.speculative-cap-running-tasks</name>
|
||||||
<value>0.1</value>
|
<value>0.1</value>
|
||||||
<description>The max percent (0-1) of running tasks that
|
<description>The max percent (0-1) of running tasks that
|
||||||
can be speculatively re-executed at any time.</description>
|
can be speculatively re-executed at any time.</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>mapreduce.job.speculative.speculative-cap-total-tasks</name>
|
||||||
|
<value>0.01</value>
|
||||||
|
<description>The max percent (0-1) of all tasks that
|
||||||
|
can be speculatively re-executed at any time.</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>mapreduce.job.speculative.minimum-allowed-tasks</name>
|
||||||
|
<value>10</value>
|
||||||
|
<description>The minimum allowed tasks that
|
||||||
|
can be speculatively re-executed at any time.</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>mapreduce.job.speculative.retry-after-no-speculate</name>
|
||||||
|
<value>1000</value>
|
||||||
|
<description>The waiting time(ms) to do next round of speculation
|
||||||
|
if there is no task speculated in this round.</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>mapreduce.job.speculative.retry-after-speculate</name>
|
||||||
|
<value>15000</value>
|
||||||
|
<description>The waiting time(ms) to do next round of speculation
|
||||||
|
if there are tasks speculated in this round.</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>mapreduce.job.map.output.collector.class</name>
|
<name>mapreduce.job.map.output.collector.class</name>
|
||||||
<value>org.apache.hadoop.mapred.MapTask$MapOutputBuffer</value>
|
<value>org.apache.hadoop.mapred.MapTask$MapOutputBuffer</value>
|
||||||
|
@ -762,16 +791,6 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<property>
|
|
||||||
<name>mapreduce.job.speculative.slownodethreshold</name>
|
|
||||||
<value>1.0</value>
|
|
||||||
<description>The number of standard deviations by which a Task
|
|
||||||
Tracker's ave map and reduce progress-rates (finishTime-dispatchTime)
|
|
||||||
must be lower than the average of all successful map/reduce task's for
|
|
||||||
the TT to be considered too slow to give a speculative task to.
|
|
||||||
</description>
|
|
||||||
</property>
|
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>mapreduce.job.jvm.numtasks</name>
|
<name>mapreduce.job.jvm.numtasks</name>
|
||||||
<value>1</value>
|
<value>1</value>
|
||||||
|
|
Loading…
Reference in New Issue