MAPREDUCE-7208. Tuning TaskRuntimeEstimator. (Ahmed Hussein via jeagles)
Signed-off-by: Jonathan Eagles <jeagles@gmail.com>
This commit is contained in:
parent
ec5c102ba7
commit
c59b1b66a5
@ -71,8 +71,22 @@ public synchronized double count() {
|
|||||||
return count;
|
return count;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* calculates the mean value within 95% ConfidenceInterval.
|
||||||
|
* 1.96 is standard for 95 %
|
||||||
|
*
|
||||||
|
* @return the mean value adding 95% confidence interval
|
||||||
|
*/
|
||||||
|
public synchronized double meanCI() {
|
||||||
|
if (count <= 1) return 0.0;
|
||||||
|
double currMean = mean();
|
||||||
|
double currStd = std();
|
||||||
|
return currMean + (1.96 * currStd / Math.sqrt(count));
|
||||||
|
}
|
||||||
|
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "DataStatistics: count is " + count + ", sum is " + sum +
|
return "DataStatistics: count is " + count + ", sum is " + sum
|
||||||
", sumSquares is " + sumSquares + " mean is " + mean() + " std() is " + std();
|
+ ", sumSquares is " + sumSquares + " mean is " + mean()
|
||||||
|
+ " std() is " + std() + ", meanCI() is " + meanCI();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -414,7 +414,8 @@ private long speculationValue(TaskId taskID, long now) {
|
|||||||
if (estimatedRunTime == data.getEstimatedRunTime()
|
if (estimatedRunTime == data.getEstimatedRunTime()
|
||||||
&& progress == data.getProgress()) {
|
&& progress == data.getProgress()) {
|
||||||
// Previous stats are same as same stats
|
// Previous stats are same as same stats
|
||||||
if (data.notHeartbeatedInAWhile(now)) {
|
if (data.notHeartbeatedInAWhile(now)
|
||||||
|
|| estimator.hasStagnatedProgress(runningTaskAttemptID, now)) {
|
||||||
// Stats have stagnated for a while, simulate heart-beat.
|
// Stats have stagnated for a while, simulate heart-beat.
|
||||||
TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
|
TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
|
||||||
taskAttemptStatus.id = runningTaskAttemptID;
|
taskAttemptStatus.id = runningTaskAttemptID;
|
||||||
|
@ -69,4 +69,9 @@ public long runtimeEstimateVariance(TaskAttemptId id) {
|
|||||||
return -1L;
|
return -1L;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasStagnatedProgress(TaskAttemptId id, long timeStamp) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,170 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapreduce.v2.app.speculate;
|
||||||
|
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.speculate.forecast.SimpleExponentialSmoothing;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A task Runtime Estimator based on exponential smoothing.
|
||||||
|
*/
|
||||||
|
public class SimpleExponentialTaskRuntimeEstimator extends StartEndTimesBase {
|
||||||
|
private final static long DEFAULT_ESTIMATE_RUNTIME = -1L;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constant time used to calculate the smoothing exponential factor.
|
||||||
|
*/
|
||||||
|
private long constTime;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Number of readings before we consider the estimate stable.
|
||||||
|
* Otherwise, the estimate will be skewed due to the initial estimate
|
||||||
|
*/
|
||||||
|
private int skipCount;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Time window to automatically update the count of the skipCount. This is
|
||||||
|
* needed when a task stalls without any progress, causing the estimator to
|
||||||
|
* return -1 as an estimatedRuntime.
|
||||||
|
*/
|
||||||
|
private long stagnatedWindow;
|
||||||
|
|
||||||
|
private final ConcurrentMap<TaskAttemptId,
|
||||||
|
AtomicReference<SimpleExponentialSmoothing>>
|
||||||
|
estimates = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
private SimpleExponentialSmoothing getForecastEntry(TaskAttemptId attemptID) {
|
||||||
|
AtomicReference<SimpleExponentialSmoothing> entryRef = estimates
|
||||||
|
.get(attemptID);
|
||||||
|
if (entryRef == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return entryRef.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void incorporateReading(TaskAttemptId attemptID,
|
||||||
|
float newRawData, long newTimeStamp) {
|
||||||
|
SimpleExponentialSmoothing foreCastEntry = getForecastEntry(attemptID);
|
||||||
|
if (foreCastEntry == null) {
|
||||||
|
Long tStartTime = startTimes.get(attemptID);
|
||||||
|
// skip if the startTime is not set yet
|
||||||
|
if(tStartTime == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
estimates.putIfAbsent(attemptID,
|
||||||
|
new AtomicReference<>(SimpleExponentialSmoothing.createForecast(
|
||||||
|
constTime, skipCount, stagnatedWindow,
|
||||||
|
tStartTime)));
|
||||||
|
incorporateReading(attemptID, newRawData, newTimeStamp);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
foreCastEntry.incorporateReading(newTimeStamp, newRawData);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void contextualize(Configuration conf, AppContext context) {
|
||||||
|
super.contextualize(conf, context);
|
||||||
|
|
||||||
|
constTime
|
||||||
|
= conf.getLong(MRJobConfig.MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_LAMBDA_MS,
|
||||||
|
MRJobConfig.DEFAULT_MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_LAMBDA_MS);
|
||||||
|
|
||||||
|
stagnatedWindow = Math.max(2 * constTime, conf.getLong(
|
||||||
|
MRJobConfig.MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_STAGNATED_MS,
|
||||||
|
MRJobConfig.DEFAULT_MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_STAGNATED_MS));
|
||||||
|
|
||||||
|
skipCount = conf
|
||||||
|
.getInt(MRJobConfig.MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_SKIP_INITIALS,
|
||||||
|
MRJobConfig.DEFAULT_MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_INITIALS);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long estimatedRuntime(TaskAttemptId id) {
|
||||||
|
SimpleExponentialSmoothing foreCastEntry = getForecastEntry(id);
|
||||||
|
if (foreCastEntry == null) {
|
||||||
|
return DEFAULT_ESTIMATE_RUNTIME;
|
||||||
|
}
|
||||||
|
// TODO: What should we do when estimate is zero
|
||||||
|
double remainingWork = Math.min(1.0, 1.0 - foreCastEntry.getRawData());
|
||||||
|
double forecast = foreCastEntry.getForecast();
|
||||||
|
if (forecast <= 0.0) {
|
||||||
|
return DEFAULT_ESTIMATE_RUNTIME;
|
||||||
|
}
|
||||||
|
long remainingTime = (long)(remainingWork / forecast);
|
||||||
|
long estimatedRuntime = remainingTime
|
||||||
|
+ foreCastEntry.getTimeStamp()
|
||||||
|
- foreCastEntry.getStartTime();
|
||||||
|
return estimatedRuntime;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long estimatedNewAttemptRuntime(TaskId id) {
|
||||||
|
DataStatistics statistics = dataStatisticsForTask(id);
|
||||||
|
|
||||||
|
if (statistics == null) {
|
||||||
|
return -1L;
|
||||||
|
}
|
||||||
|
|
||||||
|
double statsMeanCI = statistics.meanCI();
|
||||||
|
double expectedVal =
|
||||||
|
statsMeanCI + Math.min(statsMeanCI * 0.25, statistics.std() / 2);
|
||||||
|
return (long)(expectedVal);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasStagnatedProgress(TaskAttemptId id, long timeStamp) {
|
||||||
|
SimpleExponentialSmoothing foreCastEntry = getForecastEntry(id);
|
||||||
|
if(foreCastEntry == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return foreCastEntry.isDataStagnated(timeStamp);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long runtimeEstimateVariance(TaskAttemptId id) {
|
||||||
|
SimpleExponentialSmoothing forecastEntry = getForecastEntry(id);
|
||||||
|
if (forecastEntry == null) {
|
||||||
|
return DEFAULT_ESTIMATE_RUNTIME;
|
||||||
|
}
|
||||||
|
double forecast = forecastEntry.getForecast();
|
||||||
|
if (forecastEntry.isDefaultForecast(forecast)) {
|
||||||
|
return DEFAULT_ESTIMATE_RUNTIME;
|
||||||
|
}
|
||||||
|
//TODO: What is the best way to measure variance in runtime
|
||||||
|
return 0L;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateAttempt(TaskAttemptStatus status, long timestamp) {
|
||||||
|
super.updateAttempt(status, timestamp);
|
||||||
|
TaskAttemptId attemptID = status.id;
|
||||||
|
|
||||||
|
float progress = status.progress;
|
||||||
|
|
||||||
|
incorporateReading(attemptID, progress, timestamp);
|
||||||
|
}
|
||||||
|
}
|
@ -152,8 +152,7 @@ public long estimatedNewAttemptRuntime(TaskId id) {
|
|||||||
if (statistics == null) {
|
if (statistics == null) {
|
||||||
return -1L;
|
return -1L;
|
||||||
}
|
}
|
||||||
|
return (long) statistics.mean();
|
||||||
return (long)statistics.mean();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -208,4 +207,9 @@ public void updateAttempt(TaskAttemptStatus status, long timestamp) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasStagnatedProgress(TaskAttemptId id, long timeStamp) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -87,4 +87,17 @@ public interface TaskRuntimeEstimator {
|
|||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public long runtimeEstimateVariance(TaskAttemptId id);
|
public long runtimeEstimateVariance(TaskAttemptId id);
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Returns true if the estimator has no updates records for a threshold time
|
||||||
|
* window. This helps to identify task attempts that are stalled at the
|
||||||
|
* beginning of execution.
|
||||||
|
*
|
||||||
|
* @param id the {@link TaskAttemptId} of the attempt we are asking about
|
||||||
|
* @param timeStamp the time of the report we compare with
|
||||||
|
* @return true if the task attempt has no progress for a given time window
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public boolean hasStagnatedProgress(TaskAttemptId id, long timeStamp);
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,196 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapreduce.v2.app.speculate.forecast;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implementation of the static model for Simple exponential smoothing.
|
||||||
|
*/
|
||||||
|
public class SimpleExponentialSmoothing {
|
||||||
|
public final static double DEFAULT_FORECAST = -1.0;
|
||||||
|
private final int kMinimumReads;
|
||||||
|
private final long kStagnatedWindow;
|
||||||
|
private final long startTime;
|
||||||
|
private long timeConstant;
|
||||||
|
|
||||||
|
private AtomicReference<ForecastRecord> forecastRefEntry;
|
||||||
|
|
||||||
|
public static SimpleExponentialSmoothing createForecast(long timeConstant,
|
||||||
|
int skipCnt, long stagnatedWindow, long timeStamp) {
|
||||||
|
return new SimpleExponentialSmoothing(timeConstant, skipCnt,
|
||||||
|
stagnatedWindow, timeStamp);
|
||||||
|
}
|
||||||
|
|
||||||
|
SimpleExponentialSmoothing(long ktConstant, int skipCnt,
|
||||||
|
long stagnatedWindow, long timeStamp) {
|
||||||
|
kMinimumReads = skipCnt;
|
||||||
|
kStagnatedWindow = stagnatedWindow;
|
||||||
|
this.timeConstant = ktConstant;
|
||||||
|
this.startTime = timeStamp;
|
||||||
|
this.forecastRefEntry = new AtomicReference<ForecastRecord>(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private class ForecastRecord {
|
||||||
|
private double alpha;
|
||||||
|
private long timeStamp;
|
||||||
|
private double sample;
|
||||||
|
private double rawData;
|
||||||
|
private double forecast;
|
||||||
|
private double sseError;
|
||||||
|
private long myIndex;
|
||||||
|
|
||||||
|
ForecastRecord(double forecast, double rawData, long timeStamp) {
|
||||||
|
this(0.0, forecast, rawData, forecast, timeStamp, 0.0, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
ForecastRecord(double alpha, double sample, double rawData,
|
||||||
|
double forecast, long timeStamp, double accError, long index) {
|
||||||
|
this.timeStamp = timeStamp;
|
||||||
|
this.alpha = alpha;
|
||||||
|
this.sseError = 0.0;
|
||||||
|
this.sample = sample;
|
||||||
|
this.forecast = forecast;
|
||||||
|
this.rawData = rawData;
|
||||||
|
this.sseError = accError;
|
||||||
|
this.myIndex = index;
|
||||||
|
}
|
||||||
|
|
||||||
|
private double preProcessRawData(double rData, long newTime) {
|
||||||
|
return processRawData(this.rawData, this.timeStamp, rData, newTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ForecastRecord append(long newTimeStamp, double rData) {
|
||||||
|
if (this.timeStamp > newTimeStamp) {
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
double newSample = preProcessRawData(rData, newTimeStamp);
|
||||||
|
long deltaTime = this.timeStamp - newTimeStamp;
|
||||||
|
if (this.myIndex == kMinimumReads) {
|
||||||
|
timeConstant = Math.max(timeConstant, newTimeStamp - startTime);
|
||||||
|
}
|
||||||
|
double smoothFactor =
|
||||||
|
1 - Math.exp(((double) deltaTime) / timeConstant);
|
||||||
|
double forecastVal =
|
||||||
|
smoothFactor * newSample + (1.0 - smoothFactor) * this.forecast;
|
||||||
|
double newSSEError =
|
||||||
|
this.sseError + Math.pow(newSample - this.forecast, 2);
|
||||||
|
return new ForecastRecord(smoothFactor, newSample, rData, forecastVal,
|
||||||
|
newTimeStamp, newSSEError, this.myIndex + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isDataStagnated(long timeStamp) {
|
||||||
|
ForecastRecord rec = forecastRefEntry.get();
|
||||||
|
if (rec != null && rec.myIndex <= kMinimumReads) {
|
||||||
|
return (rec.timeStamp + kStagnatedWindow) < timeStamp;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
static double processRawData(double oldRawData, long oldTime,
|
||||||
|
double newRawData, long newTime) {
|
||||||
|
double rate = (newRawData - oldRawData) / (newTime - oldTime);
|
||||||
|
return rate;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incorporateReading(long timeStamp, double rawData) {
|
||||||
|
ForecastRecord oldRec = forecastRefEntry.get();
|
||||||
|
if (oldRec == null) {
|
||||||
|
double oldForecast =
|
||||||
|
processRawData(0, startTime, rawData, timeStamp);
|
||||||
|
forecastRefEntry.compareAndSet(null,
|
||||||
|
new ForecastRecord(oldForecast, 0.0, startTime));
|
||||||
|
incorporateReading(timeStamp, rawData);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
while (!forecastRefEntry.compareAndSet(oldRec, oldRec.append(timeStamp,
|
||||||
|
rawData))) {
|
||||||
|
oldRec = forecastRefEntry.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public double getForecast() {
|
||||||
|
ForecastRecord rec = forecastRefEntry.get();
|
||||||
|
if (rec != null && rec.myIndex > kMinimumReads) {
|
||||||
|
return rec.forecast;
|
||||||
|
}
|
||||||
|
return DEFAULT_FORECAST;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isDefaultForecast(double value) {
|
||||||
|
return value == DEFAULT_FORECAST;
|
||||||
|
}
|
||||||
|
|
||||||
|
public double getSSE() {
|
||||||
|
ForecastRecord rec = forecastRefEntry.get();
|
||||||
|
if (rec != null) {
|
||||||
|
return rec.sseError;
|
||||||
|
}
|
||||||
|
return DEFAULT_FORECAST;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isErrorWithinBound(double bound) {
|
||||||
|
double squaredErr = getSSE();
|
||||||
|
if (squaredErr < 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return bound > squaredErr;
|
||||||
|
}
|
||||||
|
|
||||||
|
public double getRawData() {
|
||||||
|
ForecastRecord rec = forecastRefEntry.get();
|
||||||
|
if (rec != null) {
|
||||||
|
return rec.rawData;
|
||||||
|
}
|
||||||
|
return DEFAULT_FORECAST;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getTimeStamp() {
|
||||||
|
ForecastRecord rec = forecastRefEntry.get();
|
||||||
|
if (rec != null) {
|
||||||
|
return rec.timeStamp;
|
||||||
|
}
|
||||||
|
return 0L;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getStartTime() {
|
||||||
|
return startTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AtomicReference<ForecastRecord> getForecastRefEntry() {
|
||||||
|
return forecastRefEntry;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
String res = "NULL";
|
||||||
|
ForecastRecord rec = forecastRefEntry.get();
|
||||||
|
if (rec != null) {
|
||||||
|
res = "rec.index = " + rec.myIndex + ", forecast t: " + rec.timeStamp +
|
||||||
|
", forecast: " + rec.forecast
|
||||||
|
+ ", sample: " + rec.sample + ", raw: " + rec.rawData + ", error: "
|
||||||
|
+ rec.sseError + ", alpha: " + rec.alpha;
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -59,6 +59,7 @@
|
|||||||
import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
|
import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.speculate.ExponentiallySmoothedTaskRuntimeEstimator;
|
import org.apache.hadoop.mapreduce.v2.app.speculate.ExponentiallySmoothedTaskRuntimeEstimator;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator;
|
import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.speculate.SimpleExponentialTaskRuntimeEstimator;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
|
import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
|
import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.speculate.TaskRuntimeEstimator;
|
import org.apache.hadoop.mapreduce.v2.app.speculate.TaskRuntimeEstimator;
|
||||||
@ -255,6 +256,13 @@ public void testExponentialEstimator() throws Exception {
|
|||||||
coreTestEstimator(specificEstimator, 3);
|
coreTestEstimator(specificEstimator, 3);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleExponentialEstimator() throws Exception {
|
||||||
|
TaskRuntimeEstimator specificEstimator
|
||||||
|
= new SimpleExponentialTaskRuntimeEstimator();
|
||||||
|
coreTestEstimator(specificEstimator, 3);
|
||||||
|
}
|
||||||
|
|
||||||
int taskTypeSlots(TaskType type) {
|
int taskTypeSlots(TaskType type) {
|
||||||
return type == TaskType.MAP ? MAP_SLOT_REQUIREMENT : REDUCE_SLOT_REQUIREMENT;
|
return type == TaskType.MAP ? MAP_SLOT_REQUIREMENT : REDUCE_SLOT_REQUIREMENT;
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,120 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapreduce.v2.app.speculate.forecast;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.yarn.util.ControlledClock;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Testing the statistical model of simple exponential estimator.
|
||||||
|
*/
|
||||||
|
public class TestSimpleExponentialForecast {
|
||||||
|
private static final Log LOG =
|
||||||
|
LogFactory.getLog(TestSimpleExponentialForecast.class);
|
||||||
|
|
||||||
|
private static long clockTicks = 1000L;
|
||||||
|
private ControlledClock clock;
|
||||||
|
|
||||||
|
private int incTestSimpleExponentialForecast() {
|
||||||
|
clock = new ControlledClock();
|
||||||
|
clock.tickMsec(clockTicks);
|
||||||
|
SimpleExponentialSmoothing forecaster =
|
||||||
|
new SimpleExponentialSmoothing(10000,
|
||||||
|
12, 10000, clock.getTime());
|
||||||
|
|
||||||
|
|
||||||
|
double progress = 0.0;
|
||||||
|
|
||||||
|
while(progress <= 1.0) {
|
||||||
|
clock.tickMsec(clockTicks);
|
||||||
|
forecaster.incorporateReading(clock.getTime(), progress);
|
||||||
|
LOG.info("progress: " + progress + " --> " + forecaster.toString());
|
||||||
|
progress += 0.005;
|
||||||
|
}
|
||||||
|
|
||||||
|
return forecaster.getSSE() < Math.pow(10.0, -6) ? 0 : 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private int decTestSimpleExponentialForecast() {
|
||||||
|
clock = new ControlledClock();
|
||||||
|
clock.tickMsec(clockTicks);
|
||||||
|
SimpleExponentialSmoothing forecaster =
|
||||||
|
new SimpleExponentialSmoothing(800,
|
||||||
|
12, 10000, clock.getTime());
|
||||||
|
|
||||||
|
double progress = 0.0;
|
||||||
|
|
||||||
|
double[] progressRates = new double[]{0.005, 0.004, 0.002, 0.001};
|
||||||
|
while(progress <= 1.0) {
|
||||||
|
clock.tickMsec(clockTicks);
|
||||||
|
forecaster.incorporateReading(clock.getTime(), progress);
|
||||||
|
LOG.info("progress: " + progress + " --> " + forecaster.toString());
|
||||||
|
progress += progressRates[(int)(progress / 0.25)];
|
||||||
|
}
|
||||||
|
|
||||||
|
return forecaster.getSSE() < Math.pow(10.0, -6) ? 0 : 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
private int zeroTestSimpleExponentialForecast() {
|
||||||
|
clock = new ControlledClock();
|
||||||
|
clock.tickMsec(clockTicks);
|
||||||
|
SimpleExponentialSmoothing forecaster =
|
||||||
|
new SimpleExponentialSmoothing(800,
|
||||||
|
12, 10000, clock.getTime());
|
||||||
|
|
||||||
|
double progress = 0.0;
|
||||||
|
|
||||||
|
double[] progressRates = new double[]{0.005, 0.004, 0.002, 0.0, 0.003};
|
||||||
|
int progressInd = 0;
|
||||||
|
while(progress <= 1.0) {
|
||||||
|
clock.tickMsec(clockTicks);
|
||||||
|
forecaster.incorporateReading(clock.getTime(), progress);
|
||||||
|
LOG.info("progress: " + progress + " --> " + forecaster.toString());
|
||||||
|
int currInd = progressInd++ > 1000 ? 4 : (int)(progress / 0.25);
|
||||||
|
progress += progressRates[currInd];
|
||||||
|
}
|
||||||
|
|
||||||
|
return forecaster.getSSE() < Math.pow(10.0, -6) ? 0 : 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleExponentialForecastLinearInc() throws Exception {
|
||||||
|
int res = incTestSimpleExponentialForecast();
|
||||||
|
Assert.assertEquals("We got the wrong estimate from simple exponential.",
|
||||||
|
res, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleExponentialForecastLinearDec() throws Exception {
|
||||||
|
int res = decTestSimpleExponentialForecast();
|
||||||
|
Assert.assertEquals("We got the wrong estimate from simple exponential.",
|
||||||
|
res, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleExponentialForecastZeros() throws Exception {
|
||||||
|
int res = zeroTestSimpleExponentialForecast();
|
||||||
|
Assert.assertEquals("We got the wrong estimate from simple exponential.",
|
||||||
|
res, 0);
|
||||||
|
}
|
||||||
|
}
|
@ -795,6 +795,37 @@ public interface MRJobConfig {
|
|||||||
public static final String MR_AM_TASK_ESTIMATOR_EXPONENTIAL_RATE_ENABLE =
|
public static final String MR_AM_TASK_ESTIMATOR_EXPONENTIAL_RATE_ENABLE =
|
||||||
MR_AM_PREFIX + "job.task.estimator.exponential.smooth.rate";
|
MR_AM_PREFIX + "job.task.estimator.exponential.smooth.rate";
|
||||||
|
|
||||||
|
/** The lambda value in the smoothing function of the task estimator.*/
|
||||||
|
String MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_LAMBDA_MS =
|
||||||
|
MR_AM_PREFIX
|
||||||
|
+ "job.task.estimator.simple.exponential.smooth.lambda-ms";
|
||||||
|
long DEFAULT_MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_LAMBDA_MS = 1000L * 120;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The window length in the simple exponential smoothing that considers the
|
||||||
|
* task attempt is stagnated.
|
||||||
|
*/
|
||||||
|
String MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_STAGNATED_MS =
|
||||||
|
MR_AM_PREFIX
|
||||||
|
+ "job.task.estimator.simple.exponential.smooth.stagnated-ms";
|
||||||
|
long DEFAULT_MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_STAGNATED_MS =
|
||||||
|
1000L * 360;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The number of initial readings that the estimator ignores before giving a
|
||||||
|
* prediction. At the beginning the smooth estimator won't be accurate in
|
||||||
|
* prediction.
|
||||||
|
*/
|
||||||
|
String MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_SKIP_INITIALS =
|
||||||
|
MR_AM_PREFIX
|
||||||
|
+ "job.task.estimator.simple.exponential.smooth.skip-initials";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The default number of reading the estimators is going to ignore before
|
||||||
|
* returning the smooth exponential prediction.
|
||||||
|
*/
|
||||||
|
int DEFAULT_MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_INITIALS = 24;
|
||||||
|
|
||||||
/** The number of threads used to handle task RPC calls.*/
|
/** The number of threads used to handle task RPC calls.*/
|
||||||
public static final String MR_AM_TASK_LISTENER_THREAD_COUNT =
|
public static final String MR_AM_TASK_LISTENER_THREAD_COUNT =
|
||||||
MR_AM_PREFIX + "job.task.listener.thread-count";
|
MR_AM_PREFIX + "job.task.listener.thread-count";
|
||||||
|
@ -0,0 +1,935 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapreduce.v2;
|
||||||
|
|
||||||
|
import java.io.DataInput;
|
||||||
|
import java.io.DataOutput;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.io.IntWritable;
|
||||||
|
import org.apache.hadoop.io.NullWritable;
|
||||||
|
import org.apache.hadoop.io.Writable;
|
||||||
|
import org.apache.hadoop.mapreduce.Counters;
|
||||||
|
import org.apache.hadoop.mapreduce.InputFormat;
|
||||||
|
import org.apache.hadoop.mapreduce.InputSplit;
|
||||||
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
import org.apache.hadoop.mapreduce.JobContext;
|
||||||
|
import org.apache.hadoop.mapreduce.JobCounter;
|
||||||
|
import org.apache.hadoop.mapreduce.JobStatus;
|
||||||
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
|
import org.apache.hadoop.mapreduce.Mapper;
|
||||||
|
import org.apache.hadoop.mapreduce.Partitioner;
|
||||||
|
import org.apache.hadoop.mapreduce.RecordReader;
|
||||||
|
import org.apache.hadoop.mapreduce.Reducer;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskType;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.speculate.ExponentiallySmoothedTaskRuntimeEstimator;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.speculate.SimpleExponentialTaskRuntimeEstimator;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.speculate.TaskRuntimeEstimator;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Ignore;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test speculation on Mini Cluster.
|
||||||
|
*/
|
||||||
|
@Ignore
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
|
public class TestSpeculativeExecOnCluster {
|
||||||
|
private static final Log LOG = LogFactory
|
||||||
|
.getLog(TestSpeculativeExecOnCluster.class);
|
||||||
|
|
||||||
|
private static final int NODE_MANAGERS_COUNT = 2;
|
||||||
|
private static final boolean ENABLE_SPECULATIVE_MAP = true;
|
||||||
|
private static final boolean ENABLE_SPECULATIVE_REDUCE = true;
|
||||||
|
|
||||||
|
private static final int NUM_MAP_DEFAULT = 8 * NODE_MANAGERS_COUNT;
|
||||||
|
private static final int NUM_REDUCE_DEFAULT = NUM_MAP_DEFAULT / 2;
|
||||||
|
private static final int MAP_SLEEP_TIME_DEFAULT = 60000;
|
||||||
|
private static final int REDUCE_SLEEP_TIME_DEFAULT = 10000;
|
||||||
|
private static final int MAP_SLEEP_COUNT_DEFAULT = 10000;
|
||||||
|
private static final int REDUCE_SLEEP_COUNT_DEFAULT = 1000;
|
||||||
|
|
||||||
|
private static final String MAP_SLEEP_COUNT =
|
||||||
|
"mapreduce.sleepjob.map.sleep.count";
|
||||||
|
private static final String REDUCE_SLEEP_COUNT =
|
||||||
|
"mapreduce.sleepjob.reduce.sleep.count";
|
||||||
|
private static final String MAP_SLEEP_TIME =
|
||||||
|
"mapreduce.sleepjob.map.sleep.time";
|
||||||
|
private static final String REDUCE_SLEEP_TIME =
|
||||||
|
"mapreduce.sleepjob.reduce.sleep.time";
|
||||||
|
private static final String MAP_SLEEP_CALCULATOR_TYPE =
|
||||||
|
"mapreduce.sleepjob.map.sleep.time.calculator";
|
||||||
|
private static final String MAP_SLEEP_CALCULATOR_TYPE_DEFAULT = "normal_run";
|
||||||
|
|
||||||
|
private static Map<String, SleepDurationCalculator> mapSleepTypeMapper;
|
||||||
|
|
||||||
|
|
||||||
|
private static FileSystem localFs;
|
||||||
|
|
||||||
|
static {
|
||||||
|
mapSleepTypeMapper = new HashMap<>();
|
||||||
|
mapSleepTypeMapper.put("normal_run", new SleepDurationCalcImpl());
|
||||||
|
mapSleepTypeMapper.put("stalled_run",
|
||||||
|
new StalledSleepDurationCalcImpl());
|
||||||
|
mapSleepTypeMapper.put("slowing_run",
|
||||||
|
new SlowingSleepDurationCalcImpl());
|
||||||
|
mapSleepTypeMapper.put("dynamic_slowing_run",
|
||||||
|
new DynamicSleepDurationCalcImpl());
|
||||||
|
mapSleepTypeMapper.put("step_stalled_run",
|
||||||
|
new StepStalledSleepDurationCalcImpl());
|
||||||
|
try {
|
||||||
|
localFs = FileSystem.getLocal(new Configuration());
|
||||||
|
} catch (IOException io) {
|
||||||
|
throw new RuntimeException("problem getting local fs", io);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final Path TEST_ROOT_DIR =
|
||||||
|
new Path("target",
|
||||||
|
TestSpeculativeExecOnCluster.class.getName() + "-tmpDir")
|
||||||
|
.makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
|
||||||
|
private static final Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
|
||||||
|
private static final Path TEST_OUT_DIR =
|
||||||
|
new Path(TEST_ROOT_DIR, "test.out.dir");
|
||||||
|
|
||||||
|
private MiniMRYarnCluster mrCluster;
|
||||||
|
|
||||||
|
private int myNumMapper;
|
||||||
|
private int myNumReduce;
|
||||||
|
private int myMapSleepTime;
|
||||||
|
private int myReduceSleepTime;
|
||||||
|
private int myMapSleepCount;
|
||||||
|
private int myReduceSleepCount;
|
||||||
|
private String chosenSleepCalc;
|
||||||
|
private Class<?> estimatorClass;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The test cases take a long time to run all the estimators against all the
|
||||||
|
* cases. We skip the legacy estimators to reduce the execution time.
|
||||||
|
*/
|
||||||
|
private List<String> ignoredTests;
|
||||||
|
|
||||||
|
|
||||||
|
@Parameterized.Parameters(name = "{index}: TaskEstimator(EstimatorClass {0})")
|
||||||
|
public static Collection<Object[]> getTestParameters() {
|
||||||
|
List<String> ignoredTests = Arrays.asList(new String[] {
|
||||||
|
"stalled_run",
|
||||||
|
"slowing_run",
|
||||||
|
"step_stalled_run"
|
||||||
|
});
|
||||||
|
return Arrays.asList(new Object[][] {
|
||||||
|
{SimpleExponentialTaskRuntimeEstimator.class, ignoredTests,
|
||||||
|
NUM_MAP_DEFAULT, NUM_REDUCE_DEFAULT},
|
||||||
|
{LegacyTaskRuntimeEstimator.class, ignoredTests,
|
||||||
|
NUM_MAP_DEFAULT, NUM_REDUCE_DEFAULT}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public TestSpeculativeExecOnCluster(
|
||||||
|
Class<? extends TaskRuntimeEstimator> estimatorKlass,
|
||||||
|
List<String> testToIgnore,
|
||||||
|
Integer numMapper,
|
||||||
|
Integer numReduce) {
|
||||||
|
this.ignoredTests = testToIgnore;
|
||||||
|
this.estimatorClass = estimatorKlass;
|
||||||
|
this.myNumMapper = numMapper;
|
||||||
|
this.myNumReduce = numReduce;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws IOException {
|
||||||
|
|
||||||
|
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
|
||||||
|
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
|
||||||
|
+ " not found. Not running test.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mrCluster == null) {
|
||||||
|
mrCluster = new MiniMRYarnCluster(
|
||||||
|
TestSpeculativeExecution.class.getName(), NODE_MANAGERS_COUNT);
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
mrCluster.init(conf);
|
||||||
|
mrCluster.start();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// workaround the absent public distcache.
|
||||||
|
localFs.copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR);
|
||||||
|
localFs.setPermission(APP_JAR, new FsPermission("700"));
|
||||||
|
myMapSleepTime = MAP_SLEEP_TIME_DEFAULT;
|
||||||
|
myReduceSleepTime = REDUCE_SLEEP_TIME_DEFAULT;
|
||||||
|
myMapSleepCount = MAP_SLEEP_COUNT_DEFAULT;
|
||||||
|
myReduceSleepCount = REDUCE_SLEEP_COUNT_DEFAULT;
|
||||||
|
chosenSleepCalc = MAP_SLEEP_CALCULATOR_TYPE_DEFAULT;
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() {
|
||||||
|
if (mrCluster != null) {
|
||||||
|
mrCluster.stop();
|
||||||
|
mrCluster = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Overrides default behavior of Partitioner for testing.
|
||||||
|
*/
|
||||||
|
public static class SpeculativeSleepJobPartitioner extends
|
||||||
|
Partitioner<IntWritable, NullWritable> {
|
||||||
|
public int getPartition(IntWritable k, NullWritable v, int numPartitions) {
|
||||||
|
return k.get() % numPartitions;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Overrides default behavior of InputSplit for testing.
|
||||||
|
*/
|
||||||
|
public static class EmptySplit extends InputSplit implements Writable {
|
||||||
|
public void write(DataOutput out) throws IOException { }
|
||||||
|
public void readFields(DataInput in) throws IOException { }
|
||||||
|
public long getLength() {
|
||||||
|
return 0L;
|
||||||
|
}
|
||||||
|
public String[] getLocations() {
|
||||||
|
return new String[0];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Input format that sleeps after updating progress.
|
||||||
|
*/
|
||||||
|
public static class SpeculativeSleepInputFormat
|
||||||
|
extends InputFormat<IntWritable, IntWritable> {
|
||||||
|
|
||||||
|
public List<InputSplit> getSplits(JobContext jobContext) {
|
||||||
|
List<InputSplit> ret = new ArrayList<InputSplit>();
|
||||||
|
int numSplits = jobContext.getConfiguration().
|
||||||
|
getInt(MRJobConfig.NUM_MAPS, 1);
|
||||||
|
for (int i = 0; i < numSplits; ++i) {
|
||||||
|
ret.add(new EmptySplit());
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
public RecordReader<IntWritable, IntWritable> createRecordReader(
|
||||||
|
InputSplit ignored, TaskAttemptContext taskContext)
|
||||||
|
throws IOException {
|
||||||
|
Configuration conf = taskContext.getConfiguration();
|
||||||
|
final int count = conf.getInt(MAP_SLEEP_COUNT, MAP_SLEEP_COUNT_DEFAULT);
|
||||||
|
if (count < 0) {
|
||||||
|
throw new IOException("Invalid map count: " + count);
|
||||||
|
}
|
||||||
|
final int redcount = conf.getInt(REDUCE_SLEEP_COUNT,
|
||||||
|
REDUCE_SLEEP_COUNT_DEFAULT);
|
||||||
|
if (redcount < 0) {
|
||||||
|
throw new IOException("Invalid reduce count: " + redcount);
|
||||||
|
}
|
||||||
|
final int emitPerMapTask = (redcount * taskContext.getNumReduceTasks());
|
||||||
|
|
||||||
|
return new RecordReader<IntWritable, IntWritable>() {
|
||||||
|
private int records = 0;
|
||||||
|
private int emitCount = 0;
|
||||||
|
private IntWritable key = null;
|
||||||
|
private IntWritable value = null;
|
||||||
|
public void initialize(InputSplit split, TaskAttemptContext context) {
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean nextKeyValue()
|
||||||
|
throws IOException {
|
||||||
|
if (count == 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
key = new IntWritable();
|
||||||
|
key.set(emitCount);
|
||||||
|
int emit = emitPerMapTask / count;
|
||||||
|
if ((emitPerMapTask) % count > records) {
|
||||||
|
++emit;
|
||||||
|
}
|
||||||
|
emitCount += emit;
|
||||||
|
value = new IntWritable();
|
||||||
|
value.set(emit);
|
||||||
|
return records++ < count;
|
||||||
|
}
|
||||||
|
public IntWritable getCurrentKey() {
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
public IntWritable getCurrentValue() {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
public void close() throws IOException { }
|
||||||
|
public float getProgress() throws IOException {
|
||||||
|
return count == 0 ? 100 : records / ((float)count);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Interface used to simulate different progress rates of the tasks.
|
||||||
|
*/
|
||||||
|
public interface SleepDurationCalculator {
|
||||||
|
long calcSleepDuration(TaskAttemptID taId, int currCount, int totalCount,
|
||||||
|
long defaultSleepDuration);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* All tasks have the same progress.
|
||||||
|
*/
|
||||||
|
public static class SleepDurationCalcImpl implements SleepDurationCalculator {
|
||||||
|
|
||||||
|
private double threshold = 1.0;
|
||||||
|
private double slowFactor = 1.0;
|
||||||
|
|
||||||
|
SleepDurationCalcImpl() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public long calcSleepDuration(TaskAttemptID taId, int currCount,
|
||||||
|
int totalCount, long defaultSleepDuration) {
|
||||||
|
if (threshold <= ((double) currCount) / totalCount) {
|
||||||
|
return (long) (slowFactor * defaultSleepDuration);
|
||||||
|
}
|
||||||
|
return defaultSleepDuration;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The first attempt of task_0 slows down by a small factor that should not
|
||||||
|
* trigger a speculation. An speculated attempt should never beat the
|
||||||
|
* original task.
|
||||||
|
* A conservative estimator/speculator will speculate another attempt
|
||||||
|
* because of the slower progress.
|
||||||
|
*/
|
||||||
|
public static class SlowingSleepDurationCalcImpl implements
|
||||||
|
SleepDurationCalculator {
|
||||||
|
|
||||||
|
private double threshold = 0.4;
|
||||||
|
private double slowFactor = 1.2;
|
||||||
|
|
||||||
|
SlowingSleepDurationCalcImpl() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public long calcSleepDuration(TaskAttemptID taId, int currCount,
|
||||||
|
int totalCount, long defaultSleepDuration) {
|
||||||
|
if ((taId.getTaskType() == TaskType.MAP)
|
||||||
|
&& (taId.getTaskID().getId() == 0) && (taId.getId() == 0)) {
|
||||||
|
if (threshold <= ((double) currCount) / totalCount) {
|
||||||
|
return (long) (slowFactor * defaultSleepDuration);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return defaultSleepDuration;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The progress of the first Mapper task is stalled by 100 times the other
|
||||||
|
* tasks.
|
||||||
|
* The speculated attempt should be succeed if the estimator detects
|
||||||
|
* the slow down on time.
|
||||||
|
*/
|
||||||
|
public static class StalledSleepDurationCalcImpl implements
|
||||||
|
SleepDurationCalculator {
|
||||||
|
|
||||||
|
StalledSleepDurationCalcImpl() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public long calcSleepDuration(TaskAttemptID taId, int currCount,
|
||||||
|
int totalCount, long defaultSleepDuration) {
|
||||||
|
if ((taId.getTaskType() == TaskType.MAP)
|
||||||
|
&& (taId.getTaskID().getId() == 0) && (taId.getId() == 0)) {
|
||||||
|
return 1000 * defaultSleepDuration;
|
||||||
|
}
|
||||||
|
return defaultSleepDuration;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Emulates the behavior with a step change in the progress.
|
||||||
|
*/
|
||||||
|
public static class StepStalledSleepDurationCalcImpl implements
|
||||||
|
SleepDurationCalculator {
|
||||||
|
|
||||||
|
private double threshold = 0.4;
|
||||||
|
private double slowFactor = 10000;
|
||||||
|
|
||||||
|
StepStalledSleepDurationCalcImpl() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public long calcSleepDuration(TaskAttemptID taId, int currCount,
|
||||||
|
int totalCount, long defaultSleepDuration) {
|
||||||
|
if ((taId.getTaskType() == TaskType.MAP)
|
||||||
|
&& (taId.getTaskID().getId() == 0) && (taId.getId() == 0)) {
|
||||||
|
if (threshold <= ((double) currCount) / totalCount) {
|
||||||
|
return (long) (slowFactor * defaultSleepDuration);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return defaultSleepDuration;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dynamically slows down the progress of the first Mapper task.
|
||||||
|
* The speculated attempt should be succeed if the estimator detects
|
||||||
|
* the slow down on time.
|
||||||
|
*/
|
||||||
|
public static class DynamicSleepDurationCalcImpl implements
|
||||||
|
SleepDurationCalculator {
|
||||||
|
|
||||||
|
private double[] thresholds;
|
||||||
|
private double[] slowFactors;
|
||||||
|
|
||||||
|
DynamicSleepDurationCalcImpl() {
|
||||||
|
thresholds = new double[] {
|
||||||
|
0.1, 0.25, 0.4, 0.5, 0.6, 0.65, 0.7, 0.8, 0.9
|
||||||
|
};
|
||||||
|
slowFactors = new double[] {
|
||||||
|
2.0, 4.0, 5.0, 6.0, 10.0, 15.0, 20.0, 25.0, 30.0
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
public long calcSleepDuration(TaskAttemptID taId, int currCount,
|
||||||
|
int totalCount,
|
||||||
|
long defaultSleepDuration) {
|
||||||
|
if ((taId.getTaskType() == TaskType.MAP)
|
||||||
|
&& (taId.getTaskID().getId() == 0) && (taId.getId() == 0)) {
|
||||||
|
double currProgress = ((double) currCount) / totalCount;
|
||||||
|
double slowFactor = 1.0;
|
||||||
|
for (int i = 0; i < thresholds.length; i++) {
|
||||||
|
if (thresholds[i] >= currProgress) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
slowFactor = slowFactors[i];
|
||||||
|
}
|
||||||
|
return (long) (slowFactor * defaultSleepDuration);
|
||||||
|
}
|
||||||
|
return defaultSleepDuration;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dummy class for testing Speculation. Sleeps for a defined period
|
||||||
|
* of time in mapper. Generates fake input for map / reduce
|
||||||
|
* jobs. Note that generated number of input pairs is in the order
|
||||||
|
* of <code>numMappers * mapSleepTime / 100</code>, so the job uses
|
||||||
|
* some disk space.
|
||||||
|
* The sleep duration for a given task is going to slowDown to evaluate
|
||||||
|
* the estimator
|
||||||
|
*/
|
||||||
|
public static class SpeculativeSleepMapper
|
||||||
|
extends Mapper<IntWritable, IntWritable, IntWritable, NullWritable> {
|
||||||
|
private long mapSleepDuration = MAP_SLEEP_TIME_DEFAULT;
|
||||||
|
private int mapSleepCount = 1;
|
||||||
|
private int count = 0;
|
||||||
|
private SleepDurationCalculator sleepCalc = new SleepDurationCalcImpl();
|
||||||
|
|
||||||
|
protected void setup(Context context)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
Configuration conf = context.getConfiguration();
|
||||||
|
this.mapSleepCount =
|
||||||
|
conf.getInt(MAP_SLEEP_COUNT, mapSleepCount);
|
||||||
|
this.mapSleepDuration = mapSleepCount == 0 ? 0 :
|
||||||
|
conf.getLong(MAP_SLEEP_TIME, MAP_SLEEP_TIME_DEFAULT) / mapSleepCount;
|
||||||
|
this.sleepCalc =
|
||||||
|
mapSleepTypeMapper.get(conf.get(MAP_SLEEP_CALCULATOR_TYPE,
|
||||||
|
MAP_SLEEP_CALCULATOR_TYPE_DEFAULT));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public void map(IntWritable key, IntWritable value, Context context)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
//it is expected that every map processes mapSleepCount number of records.
|
||||||
|
try {
|
||||||
|
context.setStatus("Sleeping... (" +
|
||||||
|
(mapSleepDuration * (mapSleepCount - count)) + ") ms left");
|
||||||
|
long sleepTime = sleepCalc.calcSleepDuration(context.getTaskAttemptID(),
|
||||||
|
count, mapSleepCount,
|
||||||
|
mapSleepDuration);
|
||||||
|
Thread.sleep(sleepTime);
|
||||||
|
} catch (InterruptedException ex) {
|
||||||
|
throw (IOException) new IOException(
|
||||||
|
"Interrupted while sleeping").initCause(ex);
|
||||||
|
}
|
||||||
|
++count;
|
||||||
|
// output reduceSleepCount * numReduce number of random values, so that
|
||||||
|
// each reducer will get reduceSleepCount number of keys.
|
||||||
|
int k = key.get();
|
||||||
|
for (int i = 0; i < value.get(); ++i) {
|
||||||
|
context.write(new IntWritable(k + i), NullWritable.get());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implementation of the reducer task for testing.
|
||||||
|
*/
|
||||||
|
public static class SpeculativeSleepReducer
|
||||||
|
extends Reducer<IntWritable, NullWritable, NullWritable, NullWritable> {
|
||||||
|
|
||||||
|
private long reduceSleepDuration = REDUCE_SLEEP_TIME_DEFAULT;
|
||||||
|
private int reduceSleepCount = 1;
|
||||||
|
private int count = 0;
|
||||||
|
|
||||||
|
protected void setup(Context context)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
Configuration conf = context.getConfiguration();
|
||||||
|
this.reduceSleepCount =
|
||||||
|
conf.getInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
|
||||||
|
this.reduceSleepDuration = reduceSleepCount == 0 ? 0 :
|
||||||
|
conf.getLong(REDUCE_SLEEP_TIME, REDUCE_SLEEP_TIME_DEFAULT)
|
||||||
|
/ reduceSleepCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void reduce(IntWritable key, Iterable<NullWritable> values,
|
||||||
|
Context context)
|
||||||
|
throws IOException {
|
||||||
|
try {
|
||||||
|
context.setStatus("Sleeping... (" +
|
||||||
|
(reduceSleepDuration * (reduceSleepCount - count)) + ") ms left");
|
||||||
|
Thread.sleep(reduceSleepDuration);
|
||||||
|
} catch (InterruptedException ex) {
|
||||||
|
throw (IOException) new IOException(
|
||||||
|
"Interrupted while sleeping").initCause(ex);
|
||||||
|
}
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A class used to map the estimatopr implementation to the expected
|
||||||
|
* test results.
|
||||||
|
*/
|
||||||
|
class EstimatorMetricsPair {
|
||||||
|
|
||||||
|
private Class<?> estimatorClass;
|
||||||
|
private int expectedMapTasks;
|
||||||
|
private int expectedReduceTasks;
|
||||||
|
private boolean speculativeEstimator;
|
||||||
|
|
||||||
|
EstimatorMetricsPair(Class<?> estimatorClass, int mapTasks, int reduceTasks,
|
||||||
|
boolean isToSpeculate) {
|
||||||
|
this.estimatorClass = estimatorClass;
|
||||||
|
this.expectedMapTasks = mapTasks;
|
||||||
|
this.expectedReduceTasks = reduceTasks;
|
||||||
|
this.speculativeEstimator = isToSpeculate;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean didSpeculate(Counters counters) {
|
||||||
|
long launchedMaps = counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
|
||||||
|
.getValue();
|
||||||
|
long launchedReduce = counters
|
||||||
|
.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES)
|
||||||
|
.getValue();
|
||||||
|
boolean isSpeculated =
|
||||||
|
(launchedMaps > expectedMapTasks
|
||||||
|
|| launchedReduce > expectedReduceTasks);
|
||||||
|
return isSpeculated;
|
||||||
|
}
|
||||||
|
|
||||||
|
String getErrorMessage(Counters counters) {
|
||||||
|
String msg = "Unexpected tasks running estimator "
|
||||||
|
+ estimatorClass.getName() + "\n\t";
|
||||||
|
long launchedMaps = counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
|
||||||
|
.getValue();
|
||||||
|
long launchedReduce = counters
|
||||||
|
.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES)
|
||||||
|
.getValue();
|
||||||
|
if (speculativeEstimator) {
|
||||||
|
if (launchedMaps < expectedMapTasks) {
|
||||||
|
msg += "maps " + launchedMaps + ", expected: " + expectedMapTasks;
|
||||||
|
}
|
||||||
|
if (launchedReduce < expectedReduceTasks) {
|
||||||
|
msg += ", reduces " + launchedReduce + ", expected: "
|
||||||
|
+ expectedReduceTasks;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (launchedMaps > expectedMapTasks) {
|
||||||
|
msg += "maps " + launchedMaps + ", expected: " + expectedMapTasks;
|
||||||
|
}
|
||||||
|
if (launchedReduce > expectedReduceTasks) {
|
||||||
|
msg += ", reduces " + launchedReduce + ", expected: "
|
||||||
|
+ expectedReduceTasks;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return msg;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExecDynamicSlowingSpeculative() throws Exception {
|
||||||
|
/*------------------------------------------------------------------
|
||||||
|
* Test that Map/Red speculates because:
|
||||||
|
* 1- all tasks have same progress rate except for task_0
|
||||||
|
* 2- task_0 slows down by dynamic increasing factor
|
||||||
|
* 3- A good estimator should readjust the estimation and the speculator
|
||||||
|
* launches a new task.
|
||||||
|
*
|
||||||
|
* Expected:
|
||||||
|
* A- SimpleExponentialTaskRuntimeEstimator: speculates a successful
|
||||||
|
* attempt to beat the slowing task_0
|
||||||
|
* B- LegacyTaskRuntimeEstimator: speculates an attempt
|
||||||
|
* C- ExponentiallySmoothedTaskRuntimeEstimator: Fails to detect the slow
|
||||||
|
* down and never speculates but it may speculate other tasks
|
||||||
|
* (mappers or reducers)
|
||||||
|
* -----------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
chosenSleepCalc = "dynamic_slowing_run";
|
||||||
|
|
||||||
|
if (ignoredTests.contains(chosenSleepCalc)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
EstimatorMetricsPair[] estimatorPairs = new EstimatorMetricsPair[] {
|
||||||
|
new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class,
|
||||||
|
myNumMapper, myNumReduce, true),
|
||||||
|
new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class,
|
||||||
|
myNumMapper, myNumReduce, true),
|
||||||
|
new EstimatorMetricsPair(
|
||||||
|
ExponentiallySmoothedTaskRuntimeEstimator.class,
|
||||||
|
myNumMapper, myNumReduce, true)
|
||||||
|
};
|
||||||
|
|
||||||
|
for (EstimatorMetricsPair specEstimator : estimatorPairs) {
|
||||||
|
if (!estimatorClass.equals(specEstimator.estimatorClass)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
LOG.info("+++ Dynamic Slow Progress testing against " + estimatorClass
|
||||||
|
.getName() + " +++");
|
||||||
|
Job job = runSpecTest();
|
||||||
|
|
||||||
|
boolean succeeded = job.waitForCompletion(true);
|
||||||
|
Assert.assertTrue(
|
||||||
|
"Job expected to succeed with estimator " + estimatorClass.getName(),
|
||||||
|
succeeded);
|
||||||
|
Assert.assertEquals(
|
||||||
|
"Job expected to succeed with estimator " + estimatorClass.getName(),
|
||||||
|
JobStatus.State.SUCCEEDED, job.getJobState());
|
||||||
|
Counters counters = job.getCounters();
|
||||||
|
|
||||||
|
String errorMessage = specEstimator.getErrorMessage(counters);
|
||||||
|
boolean didSpeculate = specEstimator.didSpeculate(counters);
|
||||||
|
Assert.assertEquals(errorMessage, didSpeculate,
|
||||||
|
specEstimator.speculativeEstimator);
|
||||||
|
Assert
|
||||||
|
.assertEquals("Failed maps higher than 0 " + estimatorClass.getName(),
|
||||||
|
0, counters.findCounter(JobCounter.NUM_FAILED_MAPS).getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExecSlowNonSpeculative() throws Exception {
|
||||||
|
/*------------------------------------------------------------------
|
||||||
|
* Test that Map/Red does not speculate because:
|
||||||
|
* 1- all tasks have same progress rate except for task_0
|
||||||
|
* 2- task_0 slows down by 0.5 after 50% of the workload
|
||||||
|
* 3- A good estimator may adjust the estimation that the task will finish
|
||||||
|
* sooner than a new speculated task.
|
||||||
|
*
|
||||||
|
* Expected:
|
||||||
|
* A- SimpleExponentialTaskRuntimeEstimator: does not speculate because
|
||||||
|
* the new attempt estimated end time is not going to be smaller than the
|
||||||
|
* original end time.
|
||||||
|
* B- LegacyTaskRuntimeEstimator: speculates an attempt
|
||||||
|
* C- ExponentiallySmoothedTaskRuntimeEstimator: speculates an attempt.
|
||||||
|
* -----------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
chosenSleepCalc = "slowing_run";
|
||||||
|
|
||||||
|
if (ignoredTests.contains(chosenSleepCalc)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
EstimatorMetricsPair[] estimatorPairs = new EstimatorMetricsPair[] {
|
||||||
|
new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class,
|
||||||
|
myNumMapper, myNumReduce, false),
|
||||||
|
new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class,
|
||||||
|
myNumMapper, myNumReduce, true),
|
||||||
|
new EstimatorMetricsPair(
|
||||||
|
ExponentiallySmoothedTaskRuntimeEstimator.class,
|
||||||
|
myNumMapper, myNumReduce, true)
|
||||||
|
};
|
||||||
|
|
||||||
|
for (EstimatorMetricsPair specEstimator : estimatorPairs) {
|
||||||
|
if (!estimatorClass.equals(specEstimator.estimatorClass)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
LOG.info("+++ Linear Slow Progress Non Speculative testing against "
|
||||||
|
+ estimatorClass.getName() + " +++");
|
||||||
|
Job job = runSpecTest();
|
||||||
|
|
||||||
|
boolean succeeded = job.waitForCompletion(true);
|
||||||
|
Assert.assertTrue(
|
||||||
|
"Job expected to succeed with estimator " + estimatorClass.getName(),
|
||||||
|
succeeded);
|
||||||
|
Assert.assertEquals(
|
||||||
|
"Job expected to succeed with estimator " + estimatorClass.getName(),
|
||||||
|
JobStatus.State.SUCCEEDED, job.getJobState());
|
||||||
|
Counters counters = job.getCounters();
|
||||||
|
|
||||||
|
String errorMessage = specEstimator.getErrorMessage(counters);
|
||||||
|
boolean didSpeculate = specEstimator.didSpeculate(counters);
|
||||||
|
Assert.assertEquals(errorMessage, didSpeculate,
|
||||||
|
specEstimator.speculativeEstimator);
|
||||||
|
Assert
|
||||||
|
.assertEquals("Failed maps higher than 0 " + estimatorClass.getName(),
|
||||||
|
0, counters.findCounter(JobCounter.NUM_FAILED_MAPS).getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExecStepStalledSpeculative() throws Exception {
|
||||||
|
/*------------------------------------------------------------------
|
||||||
|
* Test that Map/Red speculates because:
|
||||||
|
* 1- all tasks have same progress rate except for task_0
|
||||||
|
* 2- task_0 has long sleep duration
|
||||||
|
* 3- A good estimator may adjust the estimation that the task will finish
|
||||||
|
* sooner than a new speculated task.
|
||||||
|
*
|
||||||
|
* Expected:
|
||||||
|
* A- SimpleExponentialTaskRuntimeEstimator: speculates
|
||||||
|
* B- LegacyTaskRuntimeEstimator: speculates
|
||||||
|
* C- ExponentiallySmoothedTaskRuntimeEstimator: speculates
|
||||||
|
* -----------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
chosenSleepCalc = "step_stalled_run";
|
||||||
|
if (ignoredTests.contains(chosenSleepCalc)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
EstimatorMetricsPair[] estimatorPairs = new EstimatorMetricsPair[] {
|
||||||
|
new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class,
|
||||||
|
myNumMapper, myNumReduce, true),
|
||||||
|
new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class,
|
||||||
|
myNumMapper, myNumReduce, true),
|
||||||
|
new EstimatorMetricsPair(
|
||||||
|
ExponentiallySmoothedTaskRuntimeEstimator.class,
|
||||||
|
myNumMapper, myNumReduce, true)
|
||||||
|
};
|
||||||
|
|
||||||
|
for (EstimatorMetricsPair specEstimator : estimatorPairs) {
|
||||||
|
if (!estimatorClass.equals(specEstimator.estimatorClass)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
LOG.info("+++ Stalled Progress testing against "
|
||||||
|
+ estimatorClass.getName() + " +++");
|
||||||
|
Job job = runSpecTest();
|
||||||
|
|
||||||
|
boolean succeeded = job.waitForCompletion(true);
|
||||||
|
Assert.assertTrue("Job expected to succeed with estimator "
|
||||||
|
+ estimatorClass.getName(), succeeded);
|
||||||
|
Assert.assertEquals("Job expected to succeed with estimator "
|
||||||
|
+ estimatorClass.getName(), JobStatus.State.SUCCEEDED,
|
||||||
|
job.getJobState());
|
||||||
|
Counters counters = job.getCounters();
|
||||||
|
|
||||||
|
String errorMessage = specEstimator.getErrorMessage(counters);
|
||||||
|
boolean didSpeculate = specEstimator.didSpeculate(counters);
|
||||||
|
Assert.assertEquals(errorMessage, didSpeculate,
|
||||||
|
specEstimator.speculativeEstimator);
|
||||||
|
Assert.assertEquals("Failed maps higher than 0 "
|
||||||
|
+ estimatorClass.getName(), 0,
|
||||||
|
counters.findCounter(JobCounter.NUM_FAILED_MAPS)
|
||||||
|
.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExecStalledSpeculative() throws Exception {
|
||||||
|
/*------------------------------------------------------------------
|
||||||
|
* Test that Map/Red speculates because:
|
||||||
|
* 1- all tasks have same progress rate except for task_0
|
||||||
|
* 2- task_0 has long sleep duration
|
||||||
|
* 3- A good estimator may adjust the estimation that the task will finish
|
||||||
|
* sooner than a new speculated task.
|
||||||
|
*
|
||||||
|
* Expected:
|
||||||
|
* A- SimpleExponentialTaskRuntimeEstimator: speculates
|
||||||
|
* B- LegacyTaskRuntimeEstimator: speculates
|
||||||
|
* C- ExponentiallySmoothedTaskRuntimeEstimator: speculates
|
||||||
|
* -----------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
chosenSleepCalc = "stalled_run";
|
||||||
|
|
||||||
|
if (ignoredTests.contains(chosenSleepCalc)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
EstimatorMetricsPair[] estimatorPairs = new EstimatorMetricsPair[] {
|
||||||
|
new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class,
|
||||||
|
myNumMapper, myNumReduce, true),
|
||||||
|
new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class,
|
||||||
|
myNumMapper, myNumReduce, true),
|
||||||
|
new EstimatorMetricsPair(
|
||||||
|
ExponentiallySmoothedTaskRuntimeEstimator.class,
|
||||||
|
myNumMapper, myNumReduce, true)
|
||||||
|
};
|
||||||
|
|
||||||
|
for (EstimatorMetricsPair specEstimator : estimatorPairs) {
|
||||||
|
if (!estimatorClass.equals(specEstimator.estimatorClass)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
LOG.info("+++ Stalled Progress testing against "
|
||||||
|
+ estimatorClass.getName() + " +++");
|
||||||
|
Job job = runSpecTest();
|
||||||
|
|
||||||
|
boolean succeeded = job.waitForCompletion(true);
|
||||||
|
Assert.assertTrue("Job expected to succeed with estimator "
|
||||||
|
+ estimatorClass.getName(), succeeded);
|
||||||
|
Assert.assertEquals("Job expected to succeed with estimator "
|
||||||
|
+ estimatorClass.getName(), JobStatus.State.SUCCEEDED,
|
||||||
|
job.getJobState());
|
||||||
|
Counters counters = job.getCounters();
|
||||||
|
|
||||||
|
String errorMessage = specEstimator.getErrorMessage(counters);
|
||||||
|
boolean didSpeculate = specEstimator.didSpeculate(counters);
|
||||||
|
Assert.assertEquals(errorMessage, didSpeculate,
|
||||||
|
specEstimator.speculativeEstimator);
|
||||||
|
Assert.assertEquals("Failed maps higher than 0 "
|
||||||
|
+ estimatorClass.getName(), 0,
|
||||||
|
counters.findCounter(JobCounter.NUM_FAILED_MAPS)
|
||||||
|
.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExecNonSpeculative() throws Exception {
|
||||||
|
/*------------------------------------------------------------------
|
||||||
|
* Test that Map/Red does not speculate because all tasks progress in the
|
||||||
|
* same rate.
|
||||||
|
*
|
||||||
|
* Expected:
|
||||||
|
* A- SimpleExponentialTaskRuntimeEstimator: does not speculate
|
||||||
|
* B- LegacyTaskRuntimeEstimator: speculates
|
||||||
|
* C- ExponentiallySmoothedTaskRuntimeEstimator: speculates
|
||||||
|
* -----------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
|
||||||
|
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
|
||||||
|
+ " not found. Not running test.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ignoredTests.contains(chosenSleepCalc)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
EstimatorMetricsPair[] estimatorPairs = new EstimatorMetricsPair[] {
|
||||||
|
new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class,
|
||||||
|
myNumMapper, myNumReduce, true),
|
||||||
|
new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class,
|
||||||
|
myNumMapper, myNumReduce, false),
|
||||||
|
new EstimatorMetricsPair(
|
||||||
|
ExponentiallySmoothedTaskRuntimeEstimator.class,
|
||||||
|
myNumMapper, myNumReduce, true)
|
||||||
|
};
|
||||||
|
|
||||||
|
for (EstimatorMetricsPair specEstimator : estimatorPairs) {
|
||||||
|
if (!estimatorClass.equals(specEstimator.estimatorClass)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
LOG.info("+++ No Speculation testing against "
|
||||||
|
+ estimatorClass.getName() + " +++");
|
||||||
|
Job job = runSpecTest();
|
||||||
|
|
||||||
|
boolean succeeded = job.waitForCompletion(true);
|
||||||
|
Assert.assertTrue("Job expected to succeed with estimator "
|
||||||
|
+ estimatorClass.getName(), succeeded);
|
||||||
|
Assert.assertEquals("Job expected to succeed with estimator "
|
||||||
|
+ estimatorClass.getName(), JobStatus.State.SUCCEEDED,
|
||||||
|
job.getJobState());
|
||||||
|
Counters counters = job.getCounters();
|
||||||
|
|
||||||
|
String errorMessage = specEstimator.getErrorMessage(counters);
|
||||||
|
boolean didSpeculate = specEstimator.didSpeculate(counters);
|
||||||
|
Assert.assertEquals(errorMessage, didSpeculate,
|
||||||
|
specEstimator.speculativeEstimator);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Job runSpecTest()
|
||||||
|
throws IOException, ClassNotFoundException, InterruptedException {
|
||||||
|
|
||||||
|
Configuration conf = mrCluster.getConfig();
|
||||||
|
conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, ENABLE_SPECULATIVE_MAP);
|
||||||
|
conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, ENABLE_SPECULATIVE_REDUCE);
|
||||||
|
conf.setClass(MRJobConfig.MR_AM_TASK_ESTIMATOR,
|
||||||
|
estimatorClass,
|
||||||
|
TaskRuntimeEstimator.class);
|
||||||
|
conf.setLong(MAP_SLEEP_TIME, myMapSleepTime);
|
||||||
|
conf.setLong(REDUCE_SLEEP_TIME, myReduceSleepTime);
|
||||||
|
conf.setInt(MAP_SLEEP_COUNT, myMapSleepCount);
|
||||||
|
conf.setInt(REDUCE_SLEEP_COUNT, myReduceSleepCount);
|
||||||
|
conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0F);
|
||||||
|
conf.setInt(MRJobConfig.NUM_MAPS, myNumMapper);
|
||||||
|
conf.set(MAP_SLEEP_CALCULATOR_TYPE, chosenSleepCalc);
|
||||||
|
Job job = Job.getInstance(conf);
|
||||||
|
job.setJarByClass(TestSpeculativeExecution.class);
|
||||||
|
job.setMapperClass(SpeculativeSleepMapper.class);
|
||||||
|
job.setMapOutputKeyClass(IntWritable.class);
|
||||||
|
job.setMapOutputValueClass(NullWritable.class);
|
||||||
|
job.setReducerClass(SpeculativeSleepReducer.class);
|
||||||
|
job.setOutputFormatClass(NullOutputFormat.class);
|
||||||
|
job.setInputFormatClass(SpeculativeSleepInputFormat.class);
|
||||||
|
job.setPartitionerClass(SpeculativeSleepJobPartitioner.class);
|
||||||
|
job.setNumReduceTasks(myNumReduce);
|
||||||
|
FileInputFormat.addInputPath(job, new Path("ignored"));
|
||||||
|
// Delete output directory if it exists.
|
||||||
|
try {
|
||||||
|
localFs.delete(TEST_OUT_DIR, true);
|
||||||
|
} catch (IOException e) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
FileOutputFormat.setOutputPath(job, TEST_OUT_DIR);
|
||||||
|
|
||||||
|
// Creates the Job Configuration
|
||||||
|
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
|
||||||
|
job.setMaxMapAttempts(2);
|
||||||
|
|
||||||
|
job.submit();
|
||||||
|
|
||||||
|
return job;
|
||||||
|
}
|
||||||
|
}
|
@ -18,12 +18,17 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.mapreduce.v2;
|
package org.apache.hadoop.mapreduce.v2;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.speculate.SimpleExponentialTaskRuntimeEstimator;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.speculate.TaskRuntimeEstimator;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||||
@ -48,13 +53,31 @@
|
|||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
public class TestSpeculativeExecutionWithMRApp {
|
public class TestSpeculativeExecutionWithMRApp {
|
||||||
|
|
||||||
private static final int NUM_MAPPERS = 5;
|
private static final int NUM_MAPPERS = 5;
|
||||||
private static final int NUM_REDUCERS = 0;
|
private static final int NUM_REDUCERS = 0;
|
||||||
|
|
||||||
|
@Parameterized.Parameters(name = "{index}: TaskEstimator(EstimatorClass {0})")
|
||||||
|
public static Collection<Object[]> getTestParameters() {
|
||||||
|
return Arrays.asList(new Object[][] {
|
||||||
|
{SimpleExponentialTaskRuntimeEstimator.class},
|
||||||
|
{LegacyTaskRuntimeEstimator.class}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private Class<? extends TaskRuntimeEstimator> estimatorClass;
|
||||||
|
|
||||||
|
public TestSpeculativeExecutionWithMRApp(
|
||||||
|
Class<? extends TaskRuntimeEstimator> estimatorKlass) {
|
||||||
|
this.estimatorClass = estimatorKlass;
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSpeculateSuccessfulWithoutUpdateEvents() throws Exception {
|
public void testSpeculateSuccessfulWithoutUpdateEvents() throws Exception {
|
||||||
|
|
||||||
@ -64,7 +87,7 @@ public void testSpeculateSuccessfulWithoutUpdateEvents() throws Exception {
|
|||||||
|
|
||||||
MRApp app =
|
MRApp app =
|
||||||
new MRApp(NUM_MAPPERS, NUM_REDUCERS, false, "test", true, clock);
|
new MRApp(NUM_MAPPERS, NUM_REDUCERS, false, "test", true, clock);
|
||||||
Job job = app.submit(new Configuration(), true, true);
|
Job job = app.submit(createConfiguration(), true, true);
|
||||||
app.waitForState(job, JobState.RUNNING);
|
app.waitForState(job, JobState.RUNNING);
|
||||||
|
|
||||||
Map<TaskId, Task> tasks = job.getTasks();
|
Map<TaskId, Task> tasks = job.getTasks();
|
||||||
@ -136,7 +159,7 @@ public void testSepculateSuccessfulWithUpdateEvents() throws Exception {
|
|||||||
|
|
||||||
MRApp app =
|
MRApp app =
|
||||||
new MRApp(NUM_MAPPERS, NUM_REDUCERS, false, "test", true, clock);
|
new MRApp(NUM_MAPPERS, NUM_REDUCERS, false, "test", true, clock);
|
||||||
Job job = app.submit(new Configuration(), true, true);
|
Job job = app.submit(createConfiguration(), true, true);
|
||||||
app.waitForState(job, JobState.RUNNING);
|
app.waitForState(job, JobState.RUNNING);
|
||||||
|
|
||||||
Map<TaskId, Task> tasks = job.getTasks();
|
Map<TaskId, Task> tasks = job.getTasks();
|
||||||
@ -191,6 +214,9 @@ public void testSepculateSuccessfulWithUpdateEvents() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
clock.setTime(System.currentTimeMillis() + 15000);
|
clock.setTime(System.currentTimeMillis() + 15000);
|
||||||
|
// give a chance to the speculator thread to run a scan before we proceed
|
||||||
|
// with updating events
|
||||||
|
Thread.yield();
|
||||||
for (Map.Entry<TaskId, Task> task : tasks.entrySet()) {
|
for (Map.Entry<TaskId, Task> task : tasks.entrySet()) {
|
||||||
for (Map.Entry<TaskAttemptId, TaskAttempt> taskAttempt : task.getValue()
|
for (Map.Entry<TaskAttemptId, TaskAttempt> taskAttempt : task.getValue()
|
||||||
.getAttempts().entrySet()) {
|
.getAttempts().entrySet()) {
|
||||||
@ -251,4 +277,20 @@ private TaskAttemptStatus createTaskAttemptStatus(TaskAttemptId id,
|
|||||||
status.taskState = state;
|
status.taskState = state;
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Configuration createConfiguration() {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setClass(MRJobConfig.MR_AM_TASK_ESTIMATOR,
|
||||||
|
estimatorClass,
|
||||||
|
TaskRuntimeEstimator.class);
|
||||||
|
if (SimpleExponentialTaskRuntimeEstimator.class.equals(estimatorClass)) {
|
||||||
|
// set configurations specific to SimpleExponential estimator
|
||||||
|
conf.setInt(
|
||||||
|
MRJobConfig.MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_SKIP_INITIALS, 1);
|
||||||
|
conf.setLong(
|
||||||
|
MRJobConfig.MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_LAMBDA_MS,
|
||||||
|
1000L * 10);
|
||||||
|
}
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user