From 4159a21d286d9621dc6cf1e489426bdd02d50054 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Mon, 11 Jun 2012 13:57:15 +0000 Subject: [PATCH] merge -r 1348845:1348846 from trunk. FIXES: MAPREDUCE-3927 git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1348848 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../mapreduce/v2/app/job/impl/JobImpl.java | 4 +- .../task/reduce/ShuffleScheduler.java | 38 ++++++----- .../task/reduce/TestShuffleScheduler.java | 67 +++++++++++++++++++ 4 files changed, 95 insertions(+), 17 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 5be8f6394a2..8b8e7e5a407 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -454,6 +454,9 @@ Release 0.23.3 - UNRELEASED MAPREDUCE-3842. Stop webpages from automatic refreshing (tgraves) + MAPREDUCE-3927. Shuffle hang when set map.failures.percent + (Bhallamudi Venkata Siva Kamesh via tgraves) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index daafcf7c1b3..220bbc31d6b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -590,9 +590,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, float reduceProgress = 0f; for (Task task : this.tasks.values()) { if (task.getType() == TaskType.MAP) { - mapProgress += task.getProgress(); + mapProgress += (task.isFinished() ? 1f : task.getProgress()); } else { - reduceProgress += task.getProgress(); + reduceProgress += (task.isFinished() ? 1f : task.getProgress()); } } if (this.numMapTasks != 0) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java index df8c6b3ff0d..8d5bc3b9050 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java @@ -137,24 +137,26 @@ class ShuffleScheduler { // update the status totalBytesShuffledTillNow += bytes; - float mbs = (float) totalBytesShuffledTillNow / (1024 * 1024); - int mapsDone = totalMaps - remainingMaps; - long secsSinceStart = - (System.currentTimeMillis()-startTime)/1000+1; - - float transferRate = mbs/secsSinceStart; - progress.set((float) mapsDone / totalMaps); - String statusString = mapsDone + " / " + totalMaps + " copied."; - status.setStateString(statusString); - progress.setStatus("copy(" + mapsDone + " of " + totalMaps - + " at " + - mbpsFormat.format(transferRate) + " MB/s)"); - + updateStatus(); reduceShuffleBytes.increment(bytes); lastProgressTime = System.currentTimeMillis(); - LOG.debug("map " + mapId + " done " + statusString); + LOG.debug("map " + mapId + " done " + status.getStateString()); } } + + private void updateStatus() { + float mbs = (float) totalBytesShuffledTillNow / (1024 * 1024); + int mapsDone = totalMaps - remainingMaps; + long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1; + + float transferRate = mbs / secsSinceStart; + progress.set((float) mapsDone / totalMaps); + String statusString = mapsDone + " / " + totalMaps + " copied."; + status.setStateString(statusString); + + progress.setStatus("copy(" + mapsDone + " of " + totalMaps + " at " + + mbpsFormat.format(transferRate) + " MB/s)"); + } public synchronized void copyFailed(TaskAttemptID mapId, MapHost host, boolean readError) { @@ -256,7 +258,13 @@ class ShuffleScheduler { } public synchronized void tipFailed(TaskID taskId) { - finishedMaps[taskId.getId()] = true; + if (!finishedMaps[taskId.getId()]) { + finishedMaps[taskId.getId()] = true; + if (--remainingMaps == 0) { + notifyAll(); + } + updateStatus(); + } } public synchronized void addKnownMapOutput(String hostName, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java new file mode 100644 index 00000000000..f4ed3304062 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.task.reduce; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapred.TaskStatus; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.util.Progress; +import org.junit.Assert; +import org.junit.Test; + +public class TestShuffleScheduler { + + @SuppressWarnings("rawtypes") + @Test + public void testTipFailed() throws Exception { + JobConf job = new JobConf(); + job.setNumMapTasks(2); + + TaskStatus status = new TaskStatus() { + @Override + public boolean getIsMap() { + return false; + } + + @Override + public void addFetchFailedMap(TaskAttemptID mapTaskId) { + } + }; + Progress progress = new Progress(); + + ShuffleScheduler scheduler = new ShuffleScheduler(job, status, null, + progress, null, null, null); + + JobID jobId = new JobID(); + TaskID taskId1 = new TaskID(jobId, TaskType.REDUCE, 1); + scheduler.tipFailed(taskId1); + + Assert.assertEquals("Progress should be 0.5", 0.5f, progress.getProgress(), + 0.0f); + Assert.assertFalse(scheduler.waitUntilDone(1)); + + TaskID taskId0 = new TaskID(jobId, TaskType.REDUCE, 0); + scheduler.tipFailed(taskId0); + Assert.assertEquals("Progress should be 1.0", 1.0f, progress.getProgress(), + 0.0f); + Assert.assertTrue(scheduler.waitUntilDone(1)); + } +}