merge -r 1348845:1348846 from trunk. FIXES: MAPREDUCE-3927

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1348848 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas Graves 2012-06-11 13:57:15 +00:00
parent 1f564d1135
commit 4159a21d28
4 changed files with 95 additions and 17 deletions

View File

@ -454,6 +454,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-3842. Stop webpages from automatic refreshing (tgraves) MAPREDUCE-3842. Stop webpages from automatic refreshing (tgraves)
MAPREDUCE-3927. Shuffle hang when set map.failures.percent
(Bhallamudi Venkata Siva Kamesh via tgraves)
Release 0.23.2 - UNRELEASED Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -590,9 +590,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
float reduceProgress = 0f; float reduceProgress = 0f;
for (Task task : this.tasks.values()) { for (Task task : this.tasks.values()) {
if (task.getType() == TaskType.MAP) { if (task.getType() == TaskType.MAP) {
mapProgress += task.getProgress(); mapProgress += (task.isFinished() ? 1f : task.getProgress());
} else { } else {
reduceProgress += task.getProgress(); reduceProgress += (task.isFinished() ? 1f : task.getProgress());
} }
} }
if (this.numMapTasks != 0) { if (this.numMapTasks != 0) {

View File

@ -137,23 +137,25 @@ class ShuffleScheduler<K,V> {
// update the status // update the status
totalBytesShuffledTillNow += bytes; totalBytesShuffledTillNow += bytes;
updateStatus();
reduceShuffleBytes.increment(bytes);
lastProgressTime = System.currentTimeMillis();
LOG.debug("map " + mapId + " done " + status.getStateString());
}
}
private void updateStatus() {
float mbs = (float) totalBytesShuffledTillNow / (1024 * 1024); float mbs = (float) totalBytesShuffledTillNow / (1024 * 1024);
int mapsDone = totalMaps - remainingMaps; int mapsDone = totalMaps - remainingMaps;
long secsSinceStart = long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1;
(System.currentTimeMillis()-startTime)/1000+1;
float transferRate = mbs / secsSinceStart; float transferRate = mbs / secsSinceStart;
progress.set((float) mapsDone / totalMaps); progress.set((float) mapsDone / totalMaps);
String statusString = mapsDone + " / " + totalMaps + " copied."; String statusString = mapsDone + " / " + totalMaps + " copied.";
status.setStateString(statusString); status.setStateString(statusString);
progress.setStatus("copy(" + mapsDone + " of " + totalMaps
+ " at " +
mbpsFormat.format(transferRate) + " MB/s)");
reduceShuffleBytes.increment(bytes); progress.setStatus("copy(" + mapsDone + " of " + totalMaps + " at "
lastProgressTime = System.currentTimeMillis(); + mbpsFormat.format(transferRate) + " MB/s)");
LOG.debug("map " + mapId + " done " + statusString);
}
} }
public synchronized void copyFailed(TaskAttemptID mapId, MapHost host, public synchronized void copyFailed(TaskAttemptID mapId, MapHost host,
@ -256,7 +258,13 @@ class ShuffleScheduler<K,V> {
} }
public synchronized void tipFailed(TaskID taskId) { public synchronized void tipFailed(TaskID taskId) {
if (!finishedMaps[taskId.getId()]) {
finishedMaps[taskId.getId()] = true; finishedMaps[taskId.getId()] = true;
if (--remainingMaps == 0) {
notifyAll();
}
updateStatus();
}
} }
public synchronized void addKnownMapOutput(String hostName, public synchronized void addKnownMapOutput(String hostName,

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.task.reduce;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.util.Progress;
import org.junit.Assert;
import org.junit.Test;
public class TestShuffleScheduler {
@SuppressWarnings("rawtypes")
@Test
public void testTipFailed() throws Exception {
JobConf job = new JobConf();
job.setNumMapTasks(2);
TaskStatus status = new TaskStatus() {
@Override
public boolean getIsMap() {
return false;
}
@Override
public void addFetchFailedMap(TaskAttemptID mapTaskId) {
}
};
Progress progress = new Progress();
ShuffleScheduler scheduler = new ShuffleScheduler(job, status, null,
progress, null, null, null);
JobID jobId = new JobID();
TaskID taskId1 = new TaskID(jobId, TaskType.REDUCE, 1);
scheduler.tipFailed(taskId1);
Assert.assertEquals("Progress should be 0.5", 0.5f, progress.getProgress(),
0.0f);
Assert.assertFalse(scheduler.waitUntilDone(1));
TaskID taskId0 = new TaskID(jobId, TaskType.REDUCE, 0);
scheduler.tipFailed(taskId0);
Assert.assertEquals("Progress should be 1.0", 1.0f, progress.getProgress(),
0.0f);
Assert.assertTrue(scheduler.waitUntilDone(1));
}
}