From 56988e88f7b4cc67b9aa03cfc71aaa5f05c0d859 Mon Sep 17 00:00:00 2001 From: Ahmed Hussein Date: Tue, 5 Nov 2019 14:55:20 -0600 Subject: [PATCH] MAPREDUCE-7208. Tuning TaskRuntimeEstimator. (Ahmed Hussein via jeagles) Signed-off-by: Jonathan Eagles (cherry picked from commit ed302f1fed6d124d682486d24dae958946dba9be) --- .../v2/app/speculate/DataStatistics.java | 18 +- .../v2/app/speculate/DefaultSpeculator.java | 3 +- ...SimpleExponentialTaskRuntimeEstimator.java | 170 ++++ .../v2/app/speculate/StartEndTimesBase.java | 3 +- .../app/speculate/TaskRuntimeEstimator.java | 15 + .../forecast/SimpleExponentialSmoothing.java | 196 ++++ .../v2/app/TestRuntimeEstimators.java | 8 + .../TestSimpleExponentialForecast.java | 120 +++ .../apache/hadoop/mapreduce/MRJobConfig.java | 31 + .../v2/TestSpeculativeExecOnCluster.java | 935 ++++++++++++++++++ .../v2/TestSpeculativeExecutionWithMRApp.java | 46 +- 11 files changed, 1538 insertions(+), 7 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/SimpleExponentialTaskRuntimeEstimator.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/forecast/SimpleExponentialSmoothing.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/speculate/forecast/TestSimpleExponentialForecast.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecOnCluster.java diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DataStatistics.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DataStatistics.java index cfaffaf7eaa..9f1c12243f7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DataStatistics.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DataStatistics.java @@ -71,8 +71,22 @@ public class DataStatistics { return count; } + /** + * calculates the mean value within 95% ConfidenceInterval. + * 1.96 is standard for 95 % + * + * @return the mean value adding 95% confidence interval + */ + public synchronized double meanCI() { + if (count <= 1) return 0.0; + double currMean = mean(); + double currStd = std(); + return currMean + (1.96 * currStd / Math.sqrt(count)); + } + public String toString() { - return "DataStatistics: count is " + count + ", sum is " + sum + - ", sumSquares is " + sumSquares + " mean is " + mean() + " std() is " + std(); + return "DataStatistics: count is " + count + ", sum is " + sum + + ", sumSquares is " + sumSquares + " mean is " + mean() + + " std() is " + std() + ", meanCI() is " + meanCI(); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java index 65736877b51..211bdc4da90 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java @@ -416,7 +416,8 @@ public class DefaultSpeculator extends AbstractService implements if (estimatedRunTime == data.getEstimatedRunTime() && progress == data.getProgress()) { // Previous stats are same as same stats - if (data.notHeartbeatedInAWhile(now)) { + if (data.notHeartbeatedInAWhile(now) + || estimator.hasStagnatedProgress(runningTaskAttemptID, now)) { // Stats have stagnated for a while, simulate heart-beat. TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus(); taskAttemptStatus.id = runningTaskAttemptID; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/SimpleExponentialTaskRuntimeEstimator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/SimpleExponentialTaskRuntimeEstimator.java new file mode 100644 index 00000000000..f244b20e3e0 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/SimpleExponentialTaskRuntimeEstimator.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.app.speculate; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; +import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; +import org.apache.hadoop.mapreduce.v2.app.speculate.forecast.SimpleExponentialSmoothing; + +/** + * A task Runtime Estimator based on exponential smoothing. + */ +public class SimpleExponentialTaskRuntimeEstimator extends StartEndTimesBase { + private final static long DEFAULT_ESTIMATE_RUNTIME = -1L; + + /** + * Constant time used to calculate the smoothing exponential factor. + */ + private long constTime; + + /** + * Number of readings before we consider the estimate stable. + * Otherwise, the estimate will be skewed due to the initial estimate + */ + private int skipCount; + + /** + * Time window to automatically update the count of the skipCount. This is + * needed when a task stalls without any progress, causing the estimator to + * return -1 as an estimatedRuntime. + */ + private long stagnatedWindow; + + private final ConcurrentMap> + estimates = new ConcurrentHashMap<>(); + + private SimpleExponentialSmoothing getForecastEntry(TaskAttemptId attemptID) { + AtomicReference entryRef = estimates + .get(attemptID); + if (entryRef == null) { + return null; + } + return entryRef.get(); + } + + private void incorporateReading(TaskAttemptId attemptID, + float newRawData, long newTimeStamp) { + SimpleExponentialSmoothing foreCastEntry = getForecastEntry(attemptID); + if (foreCastEntry == null) { + Long tStartTime = startTimes.get(attemptID); + // skip if the startTime is not set yet + if(tStartTime == null) { + return; + } + estimates.putIfAbsent(attemptID, + new AtomicReference<>(SimpleExponentialSmoothing.createForecast( + constTime, skipCount, stagnatedWindow, + tStartTime))); + incorporateReading(attemptID, newRawData, newTimeStamp); + return; + } + foreCastEntry.incorporateReading(newTimeStamp, newRawData); + } + + @Override + public void contextualize(Configuration conf, AppContext context) { + super.contextualize(conf, context); + + constTime + = conf.getLong(MRJobConfig.MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_LAMBDA_MS, + MRJobConfig.DEFAULT_MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_LAMBDA_MS); + + stagnatedWindow = Math.max(2 * constTime, conf.getLong( + MRJobConfig.MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_STAGNATED_MS, + MRJobConfig.DEFAULT_MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_STAGNATED_MS)); + + skipCount = conf + .getInt(MRJobConfig.MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_SKIP_INITIALS, + MRJobConfig.DEFAULT_MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_INITIALS); + } + + @Override + public long estimatedRuntime(TaskAttemptId id) { + SimpleExponentialSmoothing foreCastEntry = getForecastEntry(id); + if (foreCastEntry == null) { + return DEFAULT_ESTIMATE_RUNTIME; + } + // TODO: What should we do when estimate is zero + double remainingWork = Math.min(1.0, 1.0 - foreCastEntry.getRawData()); + double forecast = foreCastEntry.getForecast(); + if (forecast <= 0.0) { + return DEFAULT_ESTIMATE_RUNTIME; + } + long remainingTime = (long)(remainingWork / forecast); + long estimatedRuntime = remainingTime + + foreCastEntry.getTimeStamp() + - foreCastEntry.getStartTime(); + return estimatedRuntime; + } + + @Override + public long estimatedNewAttemptRuntime(TaskId id) { + DataStatistics statistics = dataStatisticsForTask(id); + + if (statistics == null) { + return -1L; + } + + double statsMeanCI = statistics.meanCI(); + double expectedVal = + statsMeanCI + Math.min(statsMeanCI * 0.25, statistics.std() / 2); + return (long)(expectedVal); + } + + @Override + public boolean hasStagnatedProgress(TaskAttemptId id, long timeStamp) { + SimpleExponentialSmoothing foreCastEntry = getForecastEntry(id); + if(foreCastEntry == null) { + return false; + } + return foreCastEntry.isDataStagnated(timeStamp); + } + + @Override + public long runtimeEstimateVariance(TaskAttemptId id) { + SimpleExponentialSmoothing forecastEntry = getForecastEntry(id); + if (forecastEntry == null) { + return DEFAULT_ESTIMATE_RUNTIME; + } + double forecast = forecastEntry.getForecast(); + if (forecastEntry.isDefaultForecast(forecast)) { + return DEFAULT_ESTIMATE_RUNTIME; + } + //TODO: What is the best way to measure variance in runtime + return 0L; + } + + @Override + public void updateAttempt(TaskAttemptStatus status, long timestamp) { + super.updateAttempt(status, timestamp); + TaskAttemptId attemptID = status.id; + + float progress = status.progress; + + incorporateReading(attemptID, progress, timestamp); + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/StartEndTimesBase.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/StartEndTimesBase.java index aee4821996f..8eda00528f1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/StartEndTimesBase.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/StartEndTimesBase.java @@ -152,8 +152,7 @@ abstract class StartEndTimesBase implements TaskRuntimeEstimator { if (statistics == null) { return -1L; } - - return (long)statistics.mean(); + return (long) statistics.mean(); } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/TaskRuntimeEstimator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/TaskRuntimeEstimator.java index ce4825ff225..4ae18758e40 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/TaskRuntimeEstimator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/TaskRuntimeEstimator.java @@ -87,4 +87,19 @@ public interface TaskRuntimeEstimator { * */ public long runtimeEstimateVariance(TaskAttemptId id); + + /** + * + * Returns true if the estimator has no updates records for a threshold time + * window. This helps to identify task attempts that are stalled at the + * beginning of execution. + * + * @param id the {@link TaskAttemptId} of the attempt we are asking about + * @param timeStamp the time of the report we compare with + * @return true if the task attempt has no progress for a given time window + * + */ + default boolean hasStagnatedProgress(TaskAttemptId id, long timeStamp) { + return false; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/forecast/SimpleExponentialSmoothing.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/forecast/SimpleExponentialSmoothing.java new file mode 100644 index 00000000000..e1ef7bec907 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/forecast/SimpleExponentialSmoothing.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.app.speculate.forecast; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * Implementation of the static model for Simple exponential smoothing. + */ +public class SimpleExponentialSmoothing { + public final static double DEFAULT_FORECAST = -1.0; + private final int kMinimumReads; + private final long kStagnatedWindow; + private final long startTime; + private long timeConstant; + + private AtomicReference forecastRefEntry; + + public static SimpleExponentialSmoothing createForecast(long timeConstant, + int skipCnt, long stagnatedWindow, long timeStamp) { + return new SimpleExponentialSmoothing(timeConstant, skipCnt, + stagnatedWindow, timeStamp); + } + + SimpleExponentialSmoothing(long ktConstant, int skipCnt, + long stagnatedWindow, long timeStamp) { + kMinimumReads = skipCnt; + kStagnatedWindow = stagnatedWindow; + this.timeConstant = ktConstant; + this.startTime = timeStamp; + this.forecastRefEntry = new AtomicReference(null); + } + + private class ForecastRecord { + private double alpha; + private long timeStamp; + private double sample; + private double rawData; + private double forecast; + private double sseError; + private long myIndex; + + ForecastRecord(double forecast, double rawData, long timeStamp) { + this(0.0, forecast, rawData, forecast, timeStamp, 0.0, 0); + } + + ForecastRecord(double alpha, double sample, double rawData, + double forecast, long timeStamp, double accError, long index) { + this.timeStamp = timeStamp; + this.alpha = alpha; + this.sseError = 0.0; + this.sample = sample; + this.forecast = forecast; + this.rawData = rawData; + this.sseError = accError; + this.myIndex = index; + } + + private double preProcessRawData(double rData, long newTime) { + return processRawData(this.rawData, this.timeStamp, rData, newTime); + } + + public ForecastRecord append(long newTimeStamp, double rData) { + if (this.timeStamp > newTimeStamp) { + return this; + } + double newSample = preProcessRawData(rData, newTimeStamp); + long deltaTime = this.timeStamp - newTimeStamp; + if (this.myIndex == kMinimumReads) { + timeConstant = Math.max(timeConstant, newTimeStamp - startTime); + } + double smoothFactor = + 1 - Math.exp(((double) deltaTime) / timeConstant); + double forecastVal = + smoothFactor * newSample + (1.0 - smoothFactor) * this.forecast; + double newSSEError = + this.sseError + Math.pow(newSample - this.forecast, 2); + return new ForecastRecord(smoothFactor, newSample, rData, forecastVal, + newTimeStamp, newSSEError, this.myIndex + 1); + } + + } + + public boolean isDataStagnated(long timeStamp) { + ForecastRecord rec = forecastRefEntry.get(); + if (rec != null && rec.myIndex <= kMinimumReads) { + return (rec.timeStamp + kStagnatedWindow) < timeStamp; + } + return false; + } + + static double processRawData(double oldRawData, long oldTime, + double newRawData, long newTime) { + double rate = (newRawData - oldRawData) / (newTime - oldTime); + return rate; + } + + public void incorporateReading(long timeStamp, double rawData) { + ForecastRecord oldRec = forecastRefEntry.get(); + if (oldRec == null) { + double oldForecast = + processRawData(0, startTime, rawData, timeStamp); + forecastRefEntry.compareAndSet(null, + new ForecastRecord(oldForecast, 0.0, startTime)); + incorporateReading(timeStamp, rawData); + return; + } + while (!forecastRefEntry.compareAndSet(oldRec, oldRec.append(timeStamp, + rawData))) { + oldRec = forecastRefEntry.get(); + } + + } + + public double getForecast() { + ForecastRecord rec = forecastRefEntry.get(); + if (rec != null && rec.myIndex > kMinimumReads) { + return rec.forecast; + } + return DEFAULT_FORECAST; + } + + public boolean isDefaultForecast(double value) { + return value == DEFAULT_FORECAST; + } + + public double getSSE() { + ForecastRecord rec = forecastRefEntry.get(); + if (rec != null) { + return rec.sseError; + } + return DEFAULT_FORECAST; + } + + public boolean isErrorWithinBound(double bound) { + double squaredErr = getSSE(); + if (squaredErr < 0) { + return false; + } + return bound > squaredErr; + } + + public double getRawData() { + ForecastRecord rec = forecastRefEntry.get(); + if (rec != null) { + return rec.rawData; + } + return DEFAULT_FORECAST; + } + + public long getTimeStamp() { + ForecastRecord rec = forecastRefEntry.get(); + if (rec != null) { + return rec.timeStamp; + } + return 0L; + } + + public long getStartTime() { + return startTime; + } + + public AtomicReference getForecastRefEntry() { + return forecastRefEntry; + } + + @Override + public String toString() { + String res = "NULL"; + ForecastRecord rec = forecastRefEntry.get(); + if (rec != null) { + res = "rec.index = " + rec.myIndex + ", forecast t: " + rec.timeStamp + + ", forecast: " + rec.forecast + + ", sample: " + rec.sample + ", raw: " + rec.rawData + ", error: " + + rec.sseError + ", alpha: " + rec.alpha; + } + return res; + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java index a846a532e36..2dc92e50964 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java @@ -57,6 +57,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator; import org.apache.hadoop.mapreduce.v2.app.speculate.ExponentiallySmoothedTaskRuntimeEstimator; import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator; +import org.apache.hadoop.mapreduce.v2.app.speculate.SimpleExponentialTaskRuntimeEstimator; import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator; import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent; import org.apache.hadoop.mapreduce.v2.app.speculate.TaskRuntimeEstimator; @@ -257,6 +258,13 @@ public class TestRuntimeEstimators { coreTestEstimator(specificEstimator, 3); } + @Test + public void testSimpleExponentialEstimator() throws Exception { + TaskRuntimeEstimator specificEstimator + = new SimpleExponentialTaskRuntimeEstimator(); + coreTestEstimator(specificEstimator, 3); + } + int taskTypeSlots(TaskType type) { return type == TaskType.MAP ? MAP_SLOT_REQUIREMENT : REDUCE_SLOT_REQUIREMENT; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/speculate/forecast/TestSimpleExponentialForecast.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/speculate/forecast/TestSimpleExponentialForecast.java new file mode 100644 index 00000000000..b669df765ba --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/speculate/forecast/TestSimpleExponentialForecast.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.app.speculate.forecast; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.util.ControlledClock; +import org.junit.Assert; +import org.junit.Test; + +/** + * Testing the statistical model of simple exponential estimator. + */ +public class TestSimpleExponentialForecast { + private static final Log LOG = + LogFactory.getLog(TestSimpleExponentialForecast.class); + + private static long clockTicks = 1000L; + private ControlledClock clock; + + private int incTestSimpleExponentialForecast() { + clock = new ControlledClock(); + clock.tickMsec(clockTicks); + SimpleExponentialSmoothing forecaster = + new SimpleExponentialSmoothing(10000, + 12, 10000, clock.getTime()); + + + double progress = 0.0; + + while(progress <= 1.0) { + clock.tickMsec(clockTicks); + forecaster.incorporateReading(clock.getTime(), progress); + LOG.info("progress: " + progress + " --> " + forecaster.toString()); + progress += 0.005; + } + + return forecaster.getSSE() < Math.pow(10.0, -6) ? 0 : 1; + } + + + private int decTestSimpleExponentialForecast() { + clock = new ControlledClock(); + clock.tickMsec(clockTicks); + SimpleExponentialSmoothing forecaster = + new SimpleExponentialSmoothing(800, + 12, 10000, clock.getTime()); + + double progress = 0.0; + + double[] progressRates = new double[]{0.005, 0.004, 0.002, 0.001}; + while(progress <= 1.0) { + clock.tickMsec(clockTicks); + forecaster.incorporateReading(clock.getTime(), progress); + LOG.info("progress: " + progress + " --> " + forecaster.toString()); + progress += progressRates[(int)(progress / 0.25)]; + } + + return forecaster.getSSE() < Math.pow(10.0, -6) ? 0 : 1; + } + + private int zeroTestSimpleExponentialForecast() { + clock = new ControlledClock(); + clock.tickMsec(clockTicks); + SimpleExponentialSmoothing forecaster = + new SimpleExponentialSmoothing(800, + 12, 10000, clock.getTime()); + + double progress = 0.0; + + double[] progressRates = new double[]{0.005, 0.004, 0.002, 0.0, 0.003}; + int progressInd = 0; + while(progress <= 1.0) { + clock.tickMsec(clockTicks); + forecaster.incorporateReading(clock.getTime(), progress); + LOG.info("progress: " + progress + " --> " + forecaster.toString()); + int currInd = progressInd++ > 1000 ? 4 : (int)(progress / 0.25); + progress += progressRates[currInd]; + } + + return forecaster.getSSE() < Math.pow(10.0, -6) ? 0 : 1; + } + + @Test + public void testSimpleExponentialForecastLinearInc() throws Exception { + int res = incTestSimpleExponentialForecast(); + Assert.assertEquals("We got the wrong estimate from simple exponential.", + res, 0); + } + + @Test + public void testSimpleExponentialForecastLinearDec() throws Exception { + int res = decTestSimpleExponentialForecast(); + Assert.assertEquals("We got the wrong estimate from simple exponential.", + res, 0); + } + + @Test + public void testSimpleExponentialForecastZeros() throws Exception { + int res = zeroTestSimpleExponentialForecast(); + Assert.assertEquals("We got the wrong estimate from simple exponential.", + res, 0); + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index ca18bfe5a2e..4142c0175e6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -824,6 +824,37 @@ public interface MRJobConfig { public static final String MR_AM_TASK_ESTIMATOR_EXPONENTIAL_RATE_ENABLE = MR_AM_PREFIX + "job.task.estimator.exponential.smooth.rate"; + /** The lambda value in the smoothing function of the task estimator.*/ + String MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_LAMBDA_MS = + MR_AM_PREFIX + + "job.task.estimator.simple.exponential.smooth.lambda-ms"; + long DEFAULT_MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_LAMBDA_MS = 1000L * 120; + + /** + * The window length in the simple exponential smoothing that considers the + * task attempt is stagnated. + */ + String MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_STAGNATED_MS = + MR_AM_PREFIX + + "job.task.estimator.simple.exponential.smooth.stagnated-ms"; + long DEFAULT_MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_STAGNATED_MS = + 1000L * 360; + + /** + * The number of initial readings that the estimator ignores before giving a + * prediction. At the beginning the smooth estimator won't be accurate in + * prediction. + */ + String MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_SKIP_INITIALS = + MR_AM_PREFIX + + "job.task.estimator.simple.exponential.smooth.skip-initials"; + + /** + * The default number of reading the estimators is going to ignore before + * returning the smooth exponential prediction. + */ + int DEFAULT_MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_INITIALS = 24; + /** The number of threads used to handle task RPC calls.*/ public static final String MR_AM_TASK_LISTENER_THREAD_COUNT = MR_AM_PREFIX + "job.task.listener.thread-count"; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecOnCluster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecOnCluster.java new file mode 100644 index 00000000000..02e4358a075 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecOnCluster.java @@ -0,0 +1,935 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobCounter; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.mapreduce.v2.app.speculate.ExponentiallySmoothedTaskRuntimeEstimator; +import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator; +import org.apache.hadoop.mapreduce.v2.app.speculate.SimpleExponentialTaskRuntimeEstimator; +import org.apache.hadoop.mapreduce.v2.app.speculate.TaskRuntimeEstimator; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * Test speculation on Mini Cluster. + */ +@Ignore +@RunWith(Parameterized.class) +public class TestSpeculativeExecOnCluster { + private static final Log LOG = LogFactory + .getLog(TestSpeculativeExecOnCluster.class); + + private static final int NODE_MANAGERS_COUNT = 2; + private static final boolean ENABLE_SPECULATIVE_MAP = true; + private static final boolean ENABLE_SPECULATIVE_REDUCE = true; + + private static final int NUM_MAP_DEFAULT = 8 * NODE_MANAGERS_COUNT; + private static final int NUM_REDUCE_DEFAULT = NUM_MAP_DEFAULT / 2; + private static final int MAP_SLEEP_TIME_DEFAULT = 60000; + private static final int REDUCE_SLEEP_TIME_DEFAULT = 10000; + private static final int MAP_SLEEP_COUNT_DEFAULT = 10000; + private static final int REDUCE_SLEEP_COUNT_DEFAULT = 1000; + + private static final String MAP_SLEEP_COUNT = + "mapreduce.sleepjob.map.sleep.count"; + private static final String REDUCE_SLEEP_COUNT = + "mapreduce.sleepjob.reduce.sleep.count"; + private static final String MAP_SLEEP_TIME = + "mapreduce.sleepjob.map.sleep.time"; + private static final String REDUCE_SLEEP_TIME = + "mapreduce.sleepjob.reduce.sleep.time"; + private static final String MAP_SLEEP_CALCULATOR_TYPE = + "mapreduce.sleepjob.map.sleep.time.calculator"; + private static final String MAP_SLEEP_CALCULATOR_TYPE_DEFAULT = "normal_run"; + + private static Map mapSleepTypeMapper; + + + private static FileSystem localFs; + + static { + mapSleepTypeMapper = new HashMap<>(); + mapSleepTypeMapper.put("normal_run", new SleepDurationCalcImpl()); + mapSleepTypeMapper.put("stalled_run", + new StalledSleepDurationCalcImpl()); + mapSleepTypeMapper.put("slowing_run", + new SlowingSleepDurationCalcImpl()); + mapSleepTypeMapper.put("dynamic_slowing_run", + new DynamicSleepDurationCalcImpl()); + mapSleepTypeMapper.put("step_stalled_run", + new StepStalledSleepDurationCalcImpl()); + try { + localFs = FileSystem.getLocal(new Configuration()); + } catch (IOException io) { + throw new RuntimeException("problem getting local fs", io); + } + } + + private static final Path TEST_ROOT_DIR = + new Path("target", + TestSpeculativeExecOnCluster.class.getName() + "-tmpDir") + .makeQualified(localFs.getUri(), localFs.getWorkingDirectory()); + private static final Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar"); + private static final Path TEST_OUT_DIR = + new Path(TEST_ROOT_DIR, "test.out.dir"); + + private MiniMRYarnCluster mrCluster; + + private int myNumMapper; + private int myNumReduce; + private int myMapSleepTime; + private int myReduceSleepTime; + private int myMapSleepCount; + private int myReduceSleepCount; + private String chosenSleepCalc; + private Class estimatorClass; + + + /** + * The test cases take a long time to run all the estimators against all the + * cases. We skip the legacy estimators to reduce the execution time. + */ + private List ignoredTests; + + + @Parameterized.Parameters(name = "{index}: TaskEstimator(EstimatorClass {0})") + public static Collection getTestParameters() { + List ignoredTests = Arrays.asList(new String[] { + "stalled_run", + "slowing_run", + "step_stalled_run" + }); + return Arrays.asList(new Object[][] { + {SimpleExponentialTaskRuntimeEstimator.class, ignoredTests, + NUM_MAP_DEFAULT, NUM_REDUCE_DEFAULT}, + {LegacyTaskRuntimeEstimator.class, ignoredTests, + NUM_MAP_DEFAULT, NUM_REDUCE_DEFAULT} + }); + } + + public TestSpeculativeExecOnCluster( + Class estimatorKlass, + List testToIgnore, + Integer numMapper, + Integer numReduce) { + this.ignoredTests = testToIgnore; + this.estimatorClass = estimatorKlass; + this.myNumMapper = numMapper; + this.myNumReduce = numReduce; + + } + + @Before + public void setup() throws IOException { + + if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { + LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + + " not found. Not running test."); + return; + } + + if (mrCluster == null) { + mrCluster = new MiniMRYarnCluster( + TestSpeculativeExecution.class.getName(), NODE_MANAGERS_COUNT); + Configuration conf = new Configuration(); + mrCluster.init(conf); + mrCluster.start(); + + } + + // workaround the absent public distcache. + localFs.copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR); + localFs.setPermission(APP_JAR, new FsPermission("700")); + myMapSleepTime = MAP_SLEEP_TIME_DEFAULT; + myReduceSleepTime = REDUCE_SLEEP_TIME_DEFAULT; + myMapSleepCount = MAP_SLEEP_COUNT_DEFAULT; + myReduceSleepCount = REDUCE_SLEEP_COUNT_DEFAULT; + chosenSleepCalc = MAP_SLEEP_CALCULATOR_TYPE_DEFAULT; + } + + @After + public void tearDown() { + if (mrCluster != null) { + mrCluster.stop(); + mrCluster = null; + } + } + + /** + * Overrides default behavior of Partitioner for testing. + */ + public static class SpeculativeSleepJobPartitioner extends + Partitioner { + public int getPartition(IntWritable k, NullWritable v, int numPartitions) { + return k.get() % numPartitions; + } + } + + /** + * Overrides default behavior of InputSplit for testing. + */ + public static class EmptySplit extends InputSplit implements Writable { + public void write(DataOutput out) throws IOException { } + public void readFields(DataInput in) throws IOException { } + public long getLength() { + return 0L; + } + public String[] getLocations() { + return new String[0]; + } + } + + /** + * Input format that sleeps after updating progress. + */ + public static class SpeculativeSleepInputFormat + extends InputFormat { + + public List getSplits(JobContext jobContext) { + List ret = new ArrayList(); + int numSplits = jobContext.getConfiguration(). + getInt(MRJobConfig.NUM_MAPS, 1); + for (int i = 0; i < numSplits; ++i) { + ret.add(new EmptySplit()); + } + return ret; + } + + public RecordReader createRecordReader( + InputSplit ignored, TaskAttemptContext taskContext) + throws IOException { + Configuration conf = taskContext.getConfiguration(); + final int count = conf.getInt(MAP_SLEEP_COUNT, MAP_SLEEP_COUNT_DEFAULT); + if (count < 0) { + throw new IOException("Invalid map count: " + count); + } + final int redcount = conf.getInt(REDUCE_SLEEP_COUNT, + REDUCE_SLEEP_COUNT_DEFAULT); + if (redcount < 0) { + throw new IOException("Invalid reduce count: " + redcount); + } + final int emitPerMapTask = (redcount * taskContext.getNumReduceTasks()); + + return new RecordReader() { + private int records = 0; + private int emitCount = 0; + private IntWritable key = null; + private IntWritable value = null; + public void initialize(InputSplit split, TaskAttemptContext context) { + } + + public boolean nextKeyValue() + throws IOException { + if (count == 0) { + return false; + } + key = new IntWritable(); + key.set(emitCount); + int emit = emitPerMapTask / count; + if ((emitPerMapTask) % count > records) { + ++emit; + } + emitCount += emit; + value = new IntWritable(); + value.set(emit); + return records++ < count; + } + public IntWritable getCurrentKey() { + return key; + } + public IntWritable getCurrentValue() { + return value; + } + public void close() throws IOException { } + public float getProgress() throws IOException { + return count == 0 ? 100 : records / ((float)count); + } + }; + } + } + + /** + * Interface used to simulate different progress rates of the tasks. + */ + public interface SleepDurationCalculator { + long calcSleepDuration(TaskAttemptID taId, int currCount, int totalCount, + long defaultSleepDuration); + } + + /** + * All tasks have the same progress. + */ + public static class SleepDurationCalcImpl implements SleepDurationCalculator { + + private double threshold = 1.0; + private double slowFactor = 1.0; + + SleepDurationCalcImpl() { + + } + + public long calcSleepDuration(TaskAttemptID taId, int currCount, + int totalCount, long defaultSleepDuration) { + if (threshold <= ((double) currCount) / totalCount) { + return (long) (slowFactor * defaultSleepDuration); + } + return defaultSleepDuration; + } + } + + /** + * The first attempt of task_0 slows down by a small factor that should not + * trigger a speculation. An speculated attempt should never beat the + * original task. + * A conservative estimator/speculator will speculate another attempt + * because of the slower progress. + */ + public static class SlowingSleepDurationCalcImpl implements + SleepDurationCalculator { + + private double threshold = 0.4; + private double slowFactor = 1.2; + + SlowingSleepDurationCalcImpl() { + + } + + public long calcSleepDuration(TaskAttemptID taId, int currCount, + int totalCount, long defaultSleepDuration) { + if ((taId.getTaskType() == TaskType.MAP) + && (taId.getTaskID().getId() == 0) && (taId.getId() == 0)) { + if (threshold <= ((double) currCount) / totalCount) { + return (long) (slowFactor * defaultSleepDuration); + } + } + return defaultSleepDuration; + } + } + + /** + * The progress of the first Mapper task is stalled by 100 times the other + * tasks. + * The speculated attempt should be succeed if the estimator detects + * the slow down on time. + */ + public static class StalledSleepDurationCalcImpl implements + SleepDurationCalculator { + + StalledSleepDurationCalcImpl() { + + } + + public long calcSleepDuration(TaskAttemptID taId, int currCount, + int totalCount, long defaultSleepDuration) { + if ((taId.getTaskType() == TaskType.MAP) + && (taId.getTaskID().getId() == 0) && (taId.getId() == 0)) { + return 1000 * defaultSleepDuration; + } + return defaultSleepDuration; + } + } + + + /** + * Emulates the behavior with a step change in the progress. + */ + public static class StepStalledSleepDurationCalcImpl implements + SleepDurationCalculator { + + private double threshold = 0.4; + private double slowFactor = 10000; + + StepStalledSleepDurationCalcImpl() { + + } + + public long calcSleepDuration(TaskAttemptID taId, int currCount, + int totalCount, long defaultSleepDuration) { + if ((taId.getTaskType() == TaskType.MAP) + && (taId.getTaskID().getId() == 0) && (taId.getId() == 0)) { + if (threshold <= ((double) currCount) / totalCount) { + return (long) (slowFactor * defaultSleepDuration); + } + } + return defaultSleepDuration; + } + } + + /** + * Dynamically slows down the progress of the first Mapper task. + * The speculated attempt should be succeed if the estimator detects + * the slow down on time. + */ + public static class DynamicSleepDurationCalcImpl implements + SleepDurationCalculator { + + private double[] thresholds; + private double[] slowFactors; + + DynamicSleepDurationCalcImpl() { + thresholds = new double[] { + 0.1, 0.25, 0.4, 0.5, 0.6, 0.65, 0.7, 0.8, 0.9 + }; + slowFactors = new double[] { + 2.0, 4.0, 5.0, 6.0, 10.0, 15.0, 20.0, 25.0, 30.0 + }; + } + + public long calcSleepDuration(TaskAttemptID taId, int currCount, + int totalCount, + long defaultSleepDuration) { + if ((taId.getTaskType() == TaskType.MAP) + && (taId.getTaskID().getId() == 0) && (taId.getId() == 0)) { + double currProgress = ((double) currCount) / totalCount; + double slowFactor = 1.0; + for (int i = 0; i < thresholds.length; i++) { + if (thresholds[i] >= currProgress) { + break; + } + slowFactor = slowFactors[i]; + } + return (long) (slowFactor * defaultSleepDuration); + } + return defaultSleepDuration; + } + } + + /** + * Dummy class for testing Speculation. Sleeps for a defined period + * of time in mapper. Generates fake input for map / reduce + * jobs. Note that generated number of input pairs is in the order + * of numMappers * mapSleepTime / 100, so the job uses + * some disk space. + * The sleep duration for a given task is going to slowDown to evaluate + * the estimator + */ + public static class SpeculativeSleepMapper + extends Mapper { + private long mapSleepDuration = MAP_SLEEP_TIME_DEFAULT; + private int mapSleepCount = 1; + private int count = 0; + private SleepDurationCalculator sleepCalc = new SleepDurationCalcImpl(); + + protected void setup(Context context) + throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + this.mapSleepCount = + conf.getInt(MAP_SLEEP_COUNT, mapSleepCount); + this.mapSleepDuration = mapSleepCount == 0 ? 0 : + conf.getLong(MAP_SLEEP_TIME, MAP_SLEEP_TIME_DEFAULT) / mapSleepCount; + this.sleepCalc = + mapSleepTypeMapper.get(conf.get(MAP_SLEEP_CALCULATOR_TYPE, + MAP_SLEEP_CALCULATOR_TYPE_DEFAULT)); + + } + + public void map(IntWritable key, IntWritable value, Context context) + throws IOException, InterruptedException { + //it is expected that every map processes mapSleepCount number of records. + try { + context.setStatus("Sleeping... (" + + (mapSleepDuration * (mapSleepCount - count)) + ") ms left"); + long sleepTime = sleepCalc.calcSleepDuration(context.getTaskAttemptID(), + count, mapSleepCount, + mapSleepDuration); + Thread.sleep(sleepTime); + } catch (InterruptedException ex) { + throw (IOException) new IOException( + "Interrupted while sleeping").initCause(ex); + } + ++count; + // output reduceSleepCount * numReduce number of random values, so that + // each reducer will get reduceSleepCount number of keys. + int k = key.get(); + for (int i = 0; i < value.get(); ++i) { + context.write(new IntWritable(k + i), NullWritable.get()); + } + } + } + + /** + * Implementation of the reducer task for testing. + */ + public static class SpeculativeSleepReducer + extends Reducer { + + private long reduceSleepDuration = REDUCE_SLEEP_TIME_DEFAULT; + private int reduceSleepCount = 1; + private int count = 0; + + protected void setup(Context context) + throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + this.reduceSleepCount = + conf.getInt(REDUCE_SLEEP_COUNT, reduceSleepCount); + this.reduceSleepDuration = reduceSleepCount == 0 ? 0 : + conf.getLong(REDUCE_SLEEP_TIME, REDUCE_SLEEP_TIME_DEFAULT) + / reduceSleepCount; + } + + public void reduce(IntWritable key, Iterable values, + Context context) + throws IOException { + try { + context.setStatus("Sleeping... (" + + (reduceSleepDuration * (reduceSleepCount - count)) + ") ms left"); + Thread.sleep(reduceSleepDuration); + } catch (InterruptedException ex) { + throw (IOException) new IOException( + "Interrupted while sleeping").initCause(ex); + } + count++; + } + } + + /** + * A class used to map the estimatopr implementation to the expected + * test results. + */ + class EstimatorMetricsPair { + + private Class estimatorClass; + private int expectedMapTasks; + private int expectedReduceTasks; + private boolean speculativeEstimator; + + EstimatorMetricsPair(Class estimatorClass, int mapTasks, int reduceTasks, + boolean isToSpeculate) { + this.estimatorClass = estimatorClass; + this.expectedMapTasks = mapTasks; + this.expectedReduceTasks = reduceTasks; + this.speculativeEstimator = isToSpeculate; + } + + boolean didSpeculate(Counters counters) { + long launchedMaps = counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS) + .getValue(); + long launchedReduce = counters + .findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES) + .getValue(); + boolean isSpeculated = + (launchedMaps > expectedMapTasks + || launchedReduce > expectedReduceTasks); + return isSpeculated; + } + + String getErrorMessage(Counters counters) { + String msg = "Unexpected tasks running estimator " + + estimatorClass.getName() + "\n\t"; + long launchedMaps = counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS) + .getValue(); + long launchedReduce = counters + .findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES) + .getValue(); + if (speculativeEstimator) { + if (launchedMaps < expectedMapTasks) { + msg += "maps " + launchedMaps + ", expected: " + expectedMapTasks; + } + if (launchedReduce < expectedReduceTasks) { + msg += ", reduces " + launchedReduce + ", expected: " + + expectedReduceTasks; + } + } else { + if (launchedMaps > expectedMapTasks) { + msg += "maps " + launchedMaps + ", expected: " + expectedMapTasks; + } + if (launchedReduce > expectedReduceTasks) { + msg += ", reduces " + launchedReduce + ", expected: " + + expectedReduceTasks; + } + } + return msg; + } + } + + @Test + public void testExecDynamicSlowingSpeculative() throws Exception { + /*------------------------------------------------------------------ + * Test that Map/Red speculates because: + * 1- all tasks have same progress rate except for task_0 + * 2- task_0 slows down by dynamic increasing factor + * 3- A good estimator should readjust the estimation and the speculator + * launches a new task. + * + * Expected: + * A- SimpleExponentialTaskRuntimeEstimator: speculates a successful + * attempt to beat the slowing task_0 + * B- LegacyTaskRuntimeEstimator: speculates an attempt + * C- ExponentiallySmoothedTaskRuntimeEstimator: Fails to detect the slow + * down and never speculates but it may speculate other tasks + * (mappers or reducers) + * ----------------------------------------------------------------- + */ + chosenSleepCalc = "dynamic_slowing_run"; + + if (ignoredTests.contains(chosenSleepCalc)) { + return; + } + + EstimatorMetricsPair[] estimatorPairs = new EstimatorMetricsPair[] { + new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class, + myNumMapper, myNumReduce, true), + new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class, + myNumMapper, myNumReduce, true), + new EstimatorMetricsPair( + ExponentiallySmoothedTaskRuntimeEstimator.class, + myNumMapper, myNumReduce, true) + }; + + for (EstimatorMetricsPair specEstimator : estimatorPairs) { + if (!estimatorClass.equals(specEstimator.estimatorClass)) { + continue; + } + LOG.info("+++ Dynamic Slow Progress testing against " + estimatorClass + .getName() + " +++"); + Job job = runSpecTest(); + + boolean succeeded = job.waitForCompletion(true); + Assert.assertTrue( + "Job expected to succeed with estimator " + estimatorClass.getName(), + succeeded); + Assert.assertEquals( + "Job expected to succeed with estimator " + estimatorClass.getName(), + JobStatus.State.SUCCEEDED, job.getJobState()); + Counters counters = job.getCounters(); + + String errorMessage = specEstimator.getErrorMessage(counters); + boolean didSpeculate = specEstimator.didSpeculate(counters); + Assert.assertEquals(errorMessage, didSpeculate, + specEstimator.speculativeEstimator); + Assert + .assertEquals("Failed maps higher than 0 " + estimatorClass.getName(), + 0, counters.findCounter(JobCounter.NUM_FAILED_MAPS).getValue()); + } + } + + + @Test + public void testExecSlowNonSpeculative() throws Exception { + /*------------------------------------------------------------------ + * Test that Map/Red does not speculate because: + * 1- all tasks have same progress rate except for task_0 + * 2- task_0 slows down by 0.5 after 50% of the workload + * 3- A good estimator may adjust the estimation that the task will finish + * sooner than a new speculated task. + * + * Expected: + * A- SimpleExponentialTaskRuntimeEstimator: does not speculate because + * the new attempt estimated end time is not going to be smaller than the + * original end time. + * B- LegacyTaskRuntimeEstimator: speculates an attempt + * C- ExponentiallySmoothedTaskRuntimeEstimator: speculates an attempt. + * ----------------------------------------------------------------- + */ + chosenSleepCalc = "slowing_run"; + + if (ignoredTests.contains(chosenSleepCalc)) { + return; + } + + EstimatorMetricsPair[] estimatorPairs = new EstimatorMetricsPair[] { + new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class, + myNumMapper, myNumReduce, false), + new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class, + myNumMapper, myNumReduce, true), + new EstimatorMetricsPair( + ExponentiallySmoothedTaskRuntimeEstimator.class, + myNumMapper, myNumReduce, true) + }; + + for (EstimatorMetricsPair specEstimator : estimatorPairs) { + if (!estimatorClass.equals(specEstimator.estimatorClass)) { + continue; + } + LOG.info("+++ Linear Slow Progress Non Speculative testing against " + + estimatorClass.getName() + " +++"); + Job job = runSpecTest(); + + boolean succeeded = job.waitForCompletion(true); + Assert.assertTrue( + "Job expected to succeed with estimator " + estimatorClass.getName(), + succeeded); + Assert.assertEquals( + "Job expected to succeed with estimator " + estimatorClass.getName(), + JobStatus.State.SUCCEEDED, job.getJobState()); + Counters counters = job.getCounters(); + + String errorMessage = specEstimator.getErrorMessage(counters); + boolean didSpeculate = specEstimator.didSpeculate(counters); + Assert.assertEquals(errorMessage, didSpeculate, + specEstimator.speculativeEstimator); + Assert + .assertEquals("Failed maps higher than 0 " + estimatorClass.getName(), + 0, counters.findCounter(JobCounter.NUM_FAILED_MAPS).getValue()); + } + } + + @Test + public void testExecStepStalledSpeculative() throws Exception { + /*------------------------------------------------------------------ + * Test that Map/Red speculates because: + * 1- all tasks have same progress rate except for task_0 + * 2- task_0 has long sleep duration + * 3- A good estimator may adjust the estimation that the task will finish + * sooner than a new speculated task. + * + * Expected: + * A- SimpleExponentialTaskRuntimeEstimator: speculates + * B- LegacyTaskRuntimeEstimator: speculates + * C- ExponentiallySmoothedTaskRuntimeEstimator: speculates + * ----------------------------------------------------------------- + */ + chosenSleepCalc = "step_stalled_run"; + if (ignoredTests.contains(chosenSleepCalc)) { + return; + } + EstimatorMetricsPair[] estimatorPairs = new EstimatorMetricsPair[] { + new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class, + myNumMapper, myNumReduce, true), + new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class, + myNumMapper, myNumReduce, true), + new EstimatorMetricsPair( + ExponentiallySmoothedTaskRuntimeEstimator.class, + myNumMapper, myNumReduce, true) + }; + + for (EstimatorMetricsPair specEstimator : estimatorPairs) { + if (!estimatorClass.equals(specEstimator.estimatorClass)) { + continue; + } + LOG.info("+++ Stalled Progress testing against " + + estimatorClass.getName() + " +++"); + Job job = runSpecTest(); + + boolean succeeded = job.waitForCompletion(true); + Assert.assertTrue("Job expected to succeed with estimator " + + estimatorClass.getName(), succeeded); + Assert.assertEquals("Job expected to succeed with estimator " + + estimatorClass.getName(), JobStatus.State.SUCCEEDED, + job.getJobState()); + Counters counters = job.getCounters(); + + String errorMessage = specEstimator.getErrorMessage(counters); + boolean didSpeculate = specEstimator.didSpeculate(counters); + Assert.assertEquals(errorMessage, didSpeculate, + specEstimator.speculativeEstimator); + Assert.assertEquals("Failed maps higher than 0 " + + estimatorClass.getName(), 0, + counters.findCounter(JobCounter.NUM_FAILED_MAPS) + .getValue()); + } + } + + @Test + public void testExecStalledSpeculative() throws Exception { + /*------------------------------------------------------------------ + * Test that Map/Red speculates because: + * 1- all tasks have same progress rate except for task_0 + * 2- task_0 has long sleep duration + * 3- A good estimator may adjust the estimation that the task will finish + * sooner than a new speculated task. + * + * Expected: + * A- SimpleExponentialTaskRuntimeEstimator: speculates + * B- LegacyTaskRuntimeEstimator: speculates + * C- ExponentiallySmoothedTaskRuntimeEstimator: speculates + * ----------------------------------------------------------------- + */ + chosenSleepCalc = "stalled_run"; + + if (ignoredTests.contains(chosenSleepCalc)) { + return; + } + EstimatorMetricsPair[] estimatorPairs = new EstimatorMetricsPair[] { + new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class, + myNumMapper, myNumReduce, true), + new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class, + myNumMapper, myNumReduce, true), + new EstimatorMetricsPair( + ExponentiallySmoothedTaskRuntimeEstimator.class, + myNumMapper, myNumReduce, true) + }; + + for (EstimatorMetricsPair specEstimator : estimatorPairs) { + if (!estimatorClass.equals(specEstimator.estimatorClass)) { + continue; + } + LOG.info("+++ Stalled Progress testing against " + + estimatorClass.getName() + " +++"); + Job job = runSpecTest(); + + boolean succeeded = job.waitForCompletion(true); + Assert.assertTrue("Job expected to succeed with estimator " + + estimatorClass.getName(), succeeded); + Assert.assertEquals("Job expected to succeed with estimator " + + estimatorClass.getName(), JobStatus.State.SUCCEEDED, + job.getJobState()); + Counters counters = job.getCounters(); + + String errorMessage = specEstimator.getErrorMessage(counters); + boolean didSpeculate = specEstimator.didSpeculate(counters); + Assert.assertEquals(errorMessage, didSpeculate, + specEstimator.speculativeEstimator); + Assert.assertEquals("Failed maps higher than 0 " + + estimatorClass.getName(), 0, + counters.findCounter(JobCounter.NUM_FAILED_MAPS) + .getValue()); + } + } + + @Test + public void testExecNonSpeculative() throws Exception { + /*------------------------------------------------------------------ + * Test that Map/Red does not speculate because all tasks progress in the + * same rate. + * + * Expected: + * A- SimpleExponentialTaskRuntimeEstimator: does not speculate + * B- LegacyTaskRuntimeEstimator: speculates + * C- ExponentiallySmoothedTaskRuntimeEstimator: speculates + * ----------------------------------------------------------------- + */ + if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { + LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + + " not found. Not running test."); + return; + } + + if (ignoredTests.contains(chosenSleepCalc)) { + return; + } + + EstimatorMetricsPair[] estimatorPairs = new EstimatorMetricsPair[] { + new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class, + myNumMapper, myNumReduce, true), + new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class, + myNumMapper, myNumReduce, false), + new EstimatorMetricsPair( + ExponentiallySmoothedTaskRuntimeEstimator.class, + myNumMapper, myNumReduce, true) + }; + + for (EstimatorMetricsPair specEstimator : estimatorPairs) { + if (!estimatorClass.equals(specEstimator.estimatorClass)) { + continue; + } + LOG.info("+++ No Speculation testing against " + + estimatorClass.getName() + " +++"); + Job job = runSpecTest(); + + boolean succeeded = job.waitForCompletion(true); + Assert.assertTrue("Job expected to succeed with estimator " + + estimatorClass.getName(), succeeded); + Assert.assertEquals("Job expected to succeed with estimator " + + estimatorClass.getName(), JobStatus.State.SUCCEEDED, + job.getJobState()); + Counters counters = job.getCounters(); + + String errorMessage = specEstimator.getErrorMessage(counters); + boolean didSpeculate = specEstimator.didSpeculate(counters); + Assert.assertEquals(errorMessage, didSpeculate, + specEstimator.speculativeEstimator); + } + } + + private Job runSpecTest() + throws IOException, ClassNotFoundException, InterruptedException { + + Configuration conf = mrCluster.getConfig(); + conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, ENABLE_SPECULATIVE_MAP); + conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, ENABLE_SPECULATIVE_REDUCE); + conf.setClass(MRJobConfig.MR_AM_TASK_ESTIMATOR, + estimatorClass, + TaskRuntimeEstimator.class); + conf.setLong(MAP_SLEEP_TIME, myMapSleepTime); + conf.setLong(REDUCE_SLEEP_TIME, myReduceSleepTime); + conf.setInt(MAP_SLEEP_COUNT, myMapSleepCount); + conf.setInt(REDUCE_SLEEP_COUNT, myReduceSleepCount); + conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0F); + conf.setInt(MRJobConfig.NUM_MAPS, myNumMapper); + conf.set(MAP_SLEEP_CALCULATOR_TYPE, chosenSleepCalc); + Job job = Job.getInstance(conf); + job.setJarByClass(TestSpeculativeExecution.class); + job.setMapperClass(SpeculativeSleepMapper.class); + job.setMapOutputKeyClass(IntWritable.class); + job.setMapOutputValueClass(NullWritable.class); + job.setReducerClass(SpeculativeSleepReducer.class); + job.setOutputFormatClass(NullOutputFormat.class); + job.setInputFormatClass(SpeculativeSleepInputFormat.class); + job.setPartitionerClass(SpeculativeSleepJobPartitioner.class); + job.setNumReduceTasks(myNumReduce); + FileInputFormat.addInputPath(job, new Path("ignored")); + // Delete output directory if it exists. + try { + localFs.delete(TEST_OUT_DIR, true); + } catch (IOException e) { + // ignore + } + FileOutputFormat.setOutputPath(job, TEST_OUT_DIR); + + // Creates the Job Configuration + job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. + job.setMaxMapAttempts(2); + + job.submit(); + + return job; + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java index de171c75125..940f142fdf7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java @@ -18,12 +18,17 @@ package org.apache.hadoop.mapreduce.v2; +import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.Map; import java.util.Random; import java.util.concurrent.atomic.AtomicReference; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator; +import org.apache.hadoop.mapreduce.v2.app.speculate.SimpleExponentialTaskRuntimeEstimator; +import org.apache.hadoop.mapreduce.v2.app.speculate.TaskRuntimeEstimator; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.v2.api.records.JobState; @@ -48,13 +53,31 @@ import org.apache.hadoop.yarn.util.SystemClock; import org.junit.Test; import com.google.common.base.Supplier; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; @SuppressWarnings({ "unchecked", "rawtypes" }) +@RunWith(Parameterized.class) public class TestSpeculativeExecutionWithMRApp { private static final int NUM_MAPPERS = 5; private static final int NUM_REDUCERS = 0; + @Parameterized.Parameters(name = "{index}: TaskEstimator(EstimatorClass {0})") + public static Collection getTestParameters() { + return Arrays.asList(new Object[][] { + {SimpleExponentialTaskRuntimeEstimator.class}, + {LegacyTaskRuntimeEstimator.class} + }); + } + + private Class estimatorClass; + + public TestSpeculativeExecutionWithMRApp( + Class estimatorKlass) { + this.estimatorClass = estimatorKlass; + } + @Test public void testSpeculateSuccessfulWithoutUpdateEvents() throws Exception { @@ -64,7 +87,7 @@ public class TestSpeculativeExecutionWithMRApp { MRApp app = new MRApp(NUM_MAPPERS, NUM_REDUCERS, false, "test", true, clock); - Job job = app.submit(new Configuration(), true, true); + Job job = app.submit(createConfiguration(), true, true); app.waitForState(job, JobState.RUNNING); Map tasks = job.getTasks(); @@ -136,7 +159,7 @@ public class TestSpeculativeExecutionWithMRApp { MRApp app = new MRApp(NUM_MAPPERS, NUM_REDUCERS, false, "test", true, clock); - Job job = app.submit(new Configuration(), true, true); + Job job = app.submit(createConfiguration(), true, true); app.waitForState(job, JobState.RUNNING); Map tasks = job.getTasks(); @@ -191,6 +214,9 @@ public class TestSpeculativeExecutionWithMRApp { } clock.setTime(System.currentTimeMillis() + 15000); + // give a chance to the speculator thread to run a scan before we proceed + // with updating events + Thread.yield(); for (Map.Entry task : tasks.entrySet()) { for (Map.Entry taskAttempt : task.getValue() .getAttempts().entrySet()) { @@ -251,4 +277,20 @@ public class TestSpeculativeExecutionWithMRApp { status.taskState = state; return status; } + + private Configuration createConfiguration() { + Configuration conf = new Configuration(); + conf.setClass(MRJobConfig.MR_AM_TASK_ESTIMATOR, + estimatorClass, + TaskRuntimeEstimator.class); + if (SimpleExponentialTaskRuntimeEstimator.class.equals(estimatorClass)) { + // set configurations specific to SimpleExponential estimator + conf.setInt( + MRJobConfig.MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_SKIP_INITIALS, 1); + conf.setLong( + MRJobConfig.MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_LAMBDA_MS, + 1000L * 10); + } + return conf; + } }