diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index ee31221d627..1b9ff4b9430 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -141,6 +141,10 @@ Release 2.1.2 - UNRELEASED MAPREDUCE-5442. $HADOOP_MAPRED_HOME/$HADOOP_CONF_DIR setting not working on Windows. (Yingda Chen via cnauroth) + MAPREDUCE-5533. Fixed MR speculation code to track any TaskAttempts that + aren't heart-beating for a while, so that we can aggressively speculate + instead of waiting for task-timeout (Xuan Gong via vinodkv) + Release 2.1.1-beta - 2013-09-23 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java index 532a9a2ee73..80e38334730 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java @@ -78,6 +78,16 @@ public class DefaultSpeculator extends AbstractService implements private final Map pendingSpeculations = new ConcurrentHashMap(); + // Used to track any TaskAttempts that aren't heart-beating for a while, so + // that we can aggressively speculate instead of waiting for task-timeout. + private final ConcurrentMap + runningTaskAttemptStatistics = new ConcurrentHashMap(); + // Regular heartbeat from tasks is every 3 secs. So if we don't get a + // heartbeat in 9 secs (3 heartbeats), we simulate a heartbeat with no change + // in progress. + private static final long MAX_WAITTING_TIME_FOR_HEARTBEAT = 9 * 1000; + // These are the current needs, not the initial needs. For each job, these // record the number of attempts that exist and that are actively // waiting for a container [as opposed to running or finished] @@ -329,6 +339,9 @@ public class DefaultSpeculator extends AbstractService implements runningTasks.putIfAbsent(taskID, Boolean.TRUE); } else { runningTasks.remove(taskID, Boolean.TRUE); + if (!stateString.equals(TaskAttemptState.STARTING.name())) { + runningTaskAttemptStatistics.remove(attemptID); + } } } @@ -389,6 +402,33 @@ public class DefaultSpeculator extends AbstractService implements long estimatedReplacementEndTime = now + estimator.estimatedNewAttemptRuntime(taskID); + float progress = taskAttempt.getProgress(); + TaskAttemptHistoryStatistics data = + runningTaskAttemptStatistics.get(runningTaskAttemptID); + if (data == null) { + runningTaskAttemptStatistics.put(runningTaskAttemptID, + new TaskAttemptHistoryStatistics(estimatedRunTime, progress, now)); + } else { + if (estimatedRunTime == data.getEstimatedRunTime() + && progress == data.getProgress()) { + // Previous stats are same as same stats + if (data.notHeartbeatedInAWhile(now)) { + // Stats have stagnated for a while, simulate heart-beat. + TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus(); + taskAttemptStatus.id = runningTaskAttemptID; + taskAttemptStatus.progress = progress; + taskAttemptStatus.taskState = taskAttempt.getState(); + // Now simulate the heart-beat + handleAttempt(taskAttemptStatus); + } + } else { + // Stats have changed - update our data structure + data.setEstimatedRunTime(estimatedRunTime); + data.setProgress(progress); + data.resetHeartBeatTime(now); + } + } + if (estimatedEndTime < now) { return PROGRESS_IS_GOOD; } @@ -511,4 +551,47 @@ public class DefaultSpeculator extends AbstractService implements // We'll try to issue one map and one reduce speculation per job per run return maybeScheduleAMapSpeculation() + maybeScheduleAReduceSpeculation(); } + + static class TaskAttemptHistoryStatistics { + + private long estimatedRunTime; + private float progress; + private long lastHeartBeatTime; + + public TaskAttemptHistoryStatistics(long estimatedRunTime, float progress, + long nonProgressStartTime) { + this.estimatedRunTime = estimatedRunTime; + this.progress = progress; + resetHeartBeatTime(nonProgressStartTime); + } + + public long getEstimatedRunTime() { + return this.estimatedRunTime; + } + + public float getProgress() { + return this.progress; + } + + public void setEstimatedRunTime(long estimatedRunTime) { + this.estimatedRunTime = estimatedRunTime; + } + + public void setProgress(float progress) { + this.progress = progress; + } + + public boolean notHeartbeatedInAWhile(long now) { + if (now - lastHeartBeatTime <= MAX_WAITTING_TIME_FOR_HEARTBEAT) { + return false; + } else { + resetHeartBeatTime(now); + return true; + } + } + + public void resetHeartBeatTime(long lastHeartBeatTime) { + this.lastHeartBeatTime = lastHeartBeatTime; + } + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index 2a009955e3d..3a7e865c7bf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -263,16 +263,22 @@ public class MRApp extends MRAppMaster { } public Job submit(Configuration conf) throws Exception { + //TODO: fix the bug where the speculator gets events with + //not-fully-constructed objects. For now, disable speculative exec + return submit(conf, false, false); + } + + public Job submit(Configuration conf, boolean mapSpeculative, + boolean reduceSpeculative) throws Exception { String user = conf.get(MRJobConfig.USER_NAME, UserGroupInformation - .getCurrentUser().getShortUserName()); + .getCurrentUser().getShortUserName()); conf.set(MRJobConfig.USER_NAME, user); conf.set(MRJobConfig.MR_AM_STAGING_DIR, testAbsPath.toString()); conf.setBoolean(MRJobConfig.MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR, true); - //TODO: fix the bug where the speculator gets events with - //not-fully-constructed objects. For now, disable speculative exec - LOG.info("****DISABLING SPECULATIVE EXECUTION*****"); - conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false); - conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false); + // TODO: fix the bug where the speculator gets events with + // not-fully-constructed objects. For now, disable speculative exec + conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, mapSpeculative); + conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, reduceSpeculative); init(conf); start(); @@ -281,7 +287,7 @@ public class MRApp extends MRAppMaster { // Write job.xml String jobFile = MRApps.getJobFile(conf, user, - TypeConverter.fromYarn(job.getID())); + TypeConverter.fromYarn(job.getID())); LOG.info("Writing job conf to " + jobFile); new File(jobFile).getParentFile().mkdirs(); conf.writeXml(new FileOutputStream(jobFile)); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java new file mode 100644 index 00000000000..37d09e0da38 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2; + +import java.util.Iterator; +import java.util.Map; +import java.util.Random; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskState; +import org.apache.hadoop.mapreduce.v2.app.ControlledClock; +import org.apache.hadoop.mapreduce.v2.app.MRApp; +import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.job.Task; +import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; +import org.junit.Test; + +@SuppressWarnings({ "unchecked", "rawtypes" }) +public class TestSpeculativeExecutionWithMRApp { + + private static final int NUM_MAPPERS = 5; + private static final int NUM_REDUCERS = 0; + + @Test(timeout = 60000) + public void testSpeculateSuccessfulWithoutUpdateEvents() throws Exception { + + Clock actualClock = new SystemClock(); + ControlledClock clock = new ControlledClock(actualClock); + clock.setTime(System.currentTimeMillis()); + + MRApp app = + new MRApp(NUM_MAPPERS, NUM_REDUCERS, false, "test", true, clock); + Job job = app.submit(new Configuration(), true, true); + app.waitForState(job, JobState.RUNNING); + + Map tasks = job.getTasks(); + Assert.assertEquals("Num tasks is not correct", NUM_MAPPERS + NUM_REDUCERS, + tasks.size()); + Iterator taskIter = tasks.values().iterator(); + while (taskIter.hasNext()) { + app.waitForState(taskIter.next(), TaskState.RUNNING); + } + + // Process the update events + clock.setTime(System.currentTimeMillis() + 2000); + EventHandler appEventHandler = app.getContext().getEventHandler(); + for (Map.Entry mapTask : tasks.entrySet()) { + for (Map.Entry taskAttempt : mapTask + .getValue().getAttempts().entrySet()) { + TaskAttemptStatus status = + createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.8, + TaskAttemptState.RUNNING); + TaskAttemptStatusUpdateEvent event = + new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status); + appEventHandler.handle(event); + } + } + + Random generator = new Random(); + Object[] taskValues = tasks.values().toArray(); + Task taskToBeSpeculated = + (Task) taskValues[generator.nextInt(taskValues.length)]; + + // Other than one random task, finish every other task. + for (Map.Entry mapTask : tasks.entrySet()) { + for (Map.Entry taskAttempt : mapTask + .getValue().getAttempts().entrySet()) { + if (mapTask.getKey() != taskToBeSpeculated.getID()) { + appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(), + TaskAttemptEventType.TA_DONE)); + appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(), + TaskAttemptEventType.TA_CONTAINER_CLEANED)); + app.waitForState(taskAttempt.getValue(), TaskAttemptState.SUCCEEDED); + } + } + } + + int maxTimeWait = 10; + boolean successfullySpeculated = false; + while (maxTimeWait > 0 && !successfullySpeculated) { + if (taskToBeSpeculated.getAttempts().size() != 2) { + Thread.sleep(1000); + clock.setTime(System.currentTimeMillis() + 20000); + } else { + successfullySpeculated = true; + } + maxTimeWait--; + } + Assert + .assertTrue("Couldn't speculate successfully", successfullySpeculated); + } + + @Test(timeout = 60000) + public void testSepculateSuccessfulWithUpdateEvents() throws Exception { + + Clock actualClock = new SystemClock(); + ControlledClock clock = new ControlledClock(actualClock); + clock.setTime(System.currentTimeMillis()); + + MRApp app = + new MRApp(NUM_MAPPERS, NUM_REDUCERS, false, "test", true, clock); + Job job = app.submit(new Configuration(), true, true); + app.waitForState(job, JobState.RUNNING); + + Map tasks = job.getTasks(); + Assert.assertEquals("Num tasks is not correct", NUM_MAPPERS + NUM_REDUCERS, + tasks.size()); + Iterator taskIter = tasks.values().iterator(); + while (taskIter.hasNext()) { + app.waitForState(taskIter.next(), TaskState.RUNNING); + } + + // Process the update events + clock.setTime(System.currentTimeMillis() + 1000); + EventHandler appEventHandler = app.getContext().getEventHandler(); + for (Map.Entry mapTask : tasks.entrySet()) { + for (Map.Entry taskAttempt : mapTask + .getValue().getAttempts().entrySet()) { + TaskAttemptStatus status = + createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.5, + TaskAttemptState.RUNNING); + TaskAttemptStatusUpdateEvent event = + new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status); + appEventHandler.handle(event); + } + } + + Task speculatedTask = null; + int numTasksToFinish = NUM_MAPPERS + NUM_REDUCERS - 1; + clock.setTime(System.currentTimeMillis() + 1000); + for (Map.Entry task : tasks.entrySet()) { + for (Map.Entry taskAttempt : task.getValue() + .getAttempts().entrySet()) { + if (numTasksToFinish > 0) { + appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(), + TaskAttemptEventType.TA_DONE)); + appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(), + TaskAttemptEventType.TA_CONTAINER_CLEANED)); + numTasksToFinish--; + app.waitForState(taskAttempt.getValue(), TaskAttemptState.SUCCEEDED); + } else { + // The last task is chosen for speculation + TaskAttemptStatus status = + createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.75, + TaskAttemptState.RUNNING); + speculatedTask = task.getValue(); + TaskAttemptStatusUpdateEvent event = + new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status); + appEventHandler.handle(event); + } + } + } + + clock.setTime(System.currentTimeMillis() + 15000); + for (Map.Entry task : tasks.entrySet()) { + for (Map.Entry taskAttempt : task.getValue() + .getAttempts().entrySet()) { + if (taskAttempt.getValue().getState() != TaskAttemptState.SUCCEEDED) { + TaskAttemptStatus status = + createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.75, + TaskAttemptState.RUNNING); + TaskAttemptStatusUpdateEvent event = + new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status); + appEventHandler.handle(event); + } + } + } + + int maxTimeWait = 5; + boolean successfullySpeculated = false; + while (maxTimeWait > 0 && !successfullySpeculated) { + if (speculatedTask.getAttempts().size() != 2) { + Thread.sleep(1000); + } else { + successfullySpeculated = true; + } + maxTimeWait--; + } + Assert + .assertTrue("Couldn't speculate successfully", successfullySpeculated); + } + + private TaskAttemptStatus createTaskAttemptStatus(TaskAttemptId id, + float progress, TaskAttemptState state) { + TaskAttemptStatus status = new TaskAttemptStatus(); + status.id = id; + status.progress = progress; + status.taskState = state; + return status; + } +}