MAPREDUCE-4842. Shuffle race can hang reducer. Contributed by Mariappan Asokan

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1425071 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2012-12-21 18:24:25 +00:00
parent a276f68b92
commit 92774331cc
4 changed files with 250 additions and 43 deletions

View File

@ -630,6 +630,8 @@ Release 0.23.6 - UNRELEASED
MAPREDUCE-4836. Elapsed time for running tasks on AM web UI tasks page is 0 MAPREDUCE-4836. Elapsed time for running tasks on AM web UI tasks page is 0
(Ravi Prakash via jeagles) (Ravi Prakash via jeagles)
MAPREDUCE-4842. Shuffle race can hang reducer (Mariappan Asokan via jlowe)
Release 0.23.5 - UNRELEASED Release 0.23.5 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -58,7 +58,9 @@ import org.apache.hadoop.mapreduce.task.reduce.MapOutput.MapOutputComparator;
import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
@SuppressWarnings(value={"unchecked", "deprecation"}) import com.google.common.annotations.VisibleForTesting;
@SuppressWarnings(value={"unchecked"})
@InterfaceAudience.LimitedPrivate({"MapReduce"}) @InterfaceAudience.LimitedPrivate({"MapReduce"})
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class MergeManager<K, V> { public class MergeManager<K, V> {
@ -85,7 +87,7 @@ public class MergeManager<K, V> {
Set<MapOutput<K, V>> inMemoryMapOutputs = Set<MapOutput<K, V>> inMemoryMapOutputs =
new TreeSet<MapOutput<K,V>>(new MapOutputComparator<K, V>()); new TreeSet<MapOutput<K,V>>(new MapOutputComparator<K, V>());
private final InMemoryMerger inMemoryMerger; private final MergeThread<MapOutput<K,V>, K,V> inMemoryMerger;
Set<Path> onDiskMapOutputs = new TreeSet<Path>(); Set<Path> onDiskMapOutputs = new TreeSet<Path>();
private final OnDiskMerger onDiskMerger; private final OnDiskMerger onDiskMerger;
@ -179,6 +181,8 @@ public class MergeManager<K, V> {
+ singleShuffleMemoryLimitPercent); + singleShuffleMemoryLimitPercent);
} }
usedMemory = 0L;
commitMemory = 0L;
this.maxSingleShuffleLimit = this.maxSingleShuffleLimit =
(long)(memoryLimit * singleShuffleMemoryLimitPercent); (long)(memoryLimit * singleShuffleMemoryLimitPercent);
this.memToMemMergeOutputsThreshold = this.memToMemMergeOutputsThreshold =
@ -210,7 +214,7 @@ public class MergeManager<K, V> {
this.memToMemMerger = null; this.memToMemMerger = null;
} }
this.inMemoryMerger = new InMemoryMerger(this); this.inMemoryMerger = createInMemoryMerger();
this.inMemoryMerger.start(); this.inMemoryMerger.start();
this.onDiskMerger = new OnDiskMerger(this); this.onDiskMerger = new OnDiskMerger(this);
@ -219,11 +223,19 @@ public class MergeManager<K, V> {
this.mergePhase = mergePhase; this.mergePhase = mergePhase;
} }
protected MergeThread<MapOutput<K,V>, K,V> createInMemoryMerger() {
return new InMemoryMerger(this);
}
TaskAttemptID getReduceId() { TaskAttemptID getReduceId() {
return reduceId; return reduceId;
} }
@VisibleForTesting
ExceptionReporter getExceptionReporter() {
return exceptionReporter;
}
public void waitForInMemoryMerge() throws InterruptedException { public void waitForInMemoryMerge() throws InterruptedException {
inMemoryMerger.waitForMerge(); inMemoryMerger.waitForMerge();
} }
@ -288,7 +300,6 @@ public class MergeManager<K, V> {
} }
synchronized void unreserve(long size) { synchronized void unreserve(long size) {
commitMemory -= size;
usedMemory -= size; usedMemory -= size;
} }
@ -300,24 +311,20 @@ public class MergeManager<K, V> {
commitMemory+= mapOutput.getSize(); commitMemory+= mapOutput.getSize();
synchronized (inMemoryMerger) { // Can hang if mergeThreshold is really low.
// Can hang if mergeThreshold is really low. if (commitMemory >= mergeThreshold) {
if (!inMemoryMerger.isInProgress() && commitMemory >= mergeThreshold) { LOG.info("Starting inMemoryMerger's merge since commitMemory=" +
LOG.info("Starting inMemoryMerger's merge since commitMemory=" + commitMemory + " > mergeThreshold=" + mergeThreshold +
commitMemory + " > mergeThreshold=" + mergeThreshold + ". Current usedMemory=" + usedMemory);
". Current usedMemory=" + usedMemory); inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs);
inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs); inMemoryMergedMapOutputs.clear();
inMemoryMergedMapOutputs.clear(); inMemoryMerger.startMerge(inMemoryMapOutputs);
inMemoryMerger.startMerge(inMemoryMapOutputs); commitMemory = 0L; // Reset commitMemory.
}
} }
if (memToMemMerger != null) { if (memToMemMerger != null) {
synchronized (memToMemMerger) { if (inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) {
if (!memToMemMerger.isInProgress() && memToMemMerger.startMerge(inMemoryMapOutputs);
inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) {
memToMemMerger.startMerge(inMemoryMapOutputs);
}
} }
} }
} }
@ -333,11 +340,8 @@ public class MergeManager<K, V> {
public synchronized void closeOnDiskFile(Path file) { public synchronized void closeOnDiskFile(Path file) {
onDiskMapOutputs.add(file); onDiskMapOutputs.add(file);
synchronized (onDiskMerger) { if (onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) {
if (!onDiskMerger.isInProgress() && onDiskMerger.startMerge(onDiskMapOutputs);
onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) {
onDiskMerger.startMerge(onDiskMapOutputs);
}
} }
} }

View File

@ -20,8 +20,10 @@ package org.apache.hadoop.mapreduce.task.reduce;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -30,8 +32,8 @@ abstract class MergeThread<T,K,V> extends Thread {
private static final Log LOG = LogFactory.getLog(MergeThread.class); private static final Log LOG = LogFactory.getLog(MergeThread.class);
private volatile boolean inProgress = false; private AtomicInteger numPending = new AtomicInteger(0);
private List<T> inputs = new ArrayList<T>(); private LinkedList<List<T>> pendingToBeMerged;
protected final MergeManager<K,V> manager; protected final MergeManager<K,V> manager;
private final ExceptionReporter reporter; private final ExceptionReporter reporter;
private boolean closed = false; private boolean closed = false;
@ -39,6 +41,7 @@ abstract class MergeThread<T,K,V> extends Thread {
public MergeThread(MergeManager<K,V> manager, int mergeFactor, public MergeThread(MergeManager<K,V> manager, int mergeFactor,
ExceptionReporter reporter) { ExceptionReporter reporter) {
this.pendingToBeMerged = new LinkedList<List<T>>();
this.manager = manager; this.manager = manager;
this.mergeFactor = mergeFactor; this.mergeFactor = mergeFactor;
this.reporter = reporter; this.reporter = reporter;
@ -50,53 +53,55 @@ abstract class MergeThread<T,K,V> extends Thread {
interrupt(); interrupt();
} }
public synchronized boolean isInProgress() { public void startMerge(Set<T> inputs) {
return inProgress;
}
public synchronized void startMerge(Set<T> inputs) {
if (!closed) { if (!closed) {
inProgress = true; numPending.incrementAndGet();
this.inputs = new ArrayList<T>(); List<T> toMergeInputs = new ArrayList<T>();
Iterator<T> iter=inputs.iterator(); Iterator<T> iter=inputs.iterator();
for (int ctr = 0; iter.hasNext() && ctr < mergeFactor; ++ctr) { for (int ctr = 0; iter.hasNext() && ctr < mergeFactor; ++ctr) {
this.inputs.add(iter.next()); toMergeInputs.add(iter.next());
iter.remove(); iter.remove();
} }
LOG.info(getName() + ": Starting merge with " + this.inputs.size() + LOG.info(getName() + ": Starting merge with " + toMergeInputs.size() +
" segments, while ignoring " + inputs.size() + " segments"); " segments, while ignoring " + inputs.size() + " segments");
notifyAll(); synchronized(pendingToBeMerged) {
pendingToBeMerged.addLast(toMergeInputs);
pendingToBeMerged.notifyAll();
}
} }
} }
public synchronized void waitForMerge() throws InterruptedException { public synchronized void waitForMerge() throws InterruptedException {
while (inProgress) { while (numPending.get() > 0) {
wait(); wait();
} }
} }
public void run() { public void run() {
while (true) { while (true) {
List<T> inputs = null;
try { try {
// Wait for notification to start the merge... // Wait for notification to start the merge...
synchronized (this) { synchronized (pendingToBeMerged) {
while (!inProgress) { while(pendingToBeMerged.size() <= 0) {
wait(); pendingToBeMerged.wait();
} }
// Pickup the inputs to merge.
inputs = pendingToBeMerged.removeFirst();
} }
// Merge // Merge
merge(inputs); merge(inputs);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
numPending.set(0);
return; return;
} catch(Throwable t) { } catch(Throwable t) {
numPending.set(0);
reporter.reportException(t); reporter.reportException(t);
return; return;
} finally { } finally {
synchronized (this) { synchronized (this) {
// Clear inputs numPending.decrementAndGet();
inputs = null;
inProgress = false;
notifyAll(); notifyAll();
} }
} }

View File

@ -0,0 +1,196 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.task.reduce;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.io.BoundedByteArrayOutputStream;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type;
import org.junit.Assert;
import org.junit.Test;
public class TestMergeManager {
@Test(timeout=10000)
public void testMemoryMerge() throws Exception {
final int TOTAL_MEM_BYTES = 10000;
final int OUTPUT_SIZE = 7950;
JobConf conf = new JobConf();
conf.setFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, 1.0f);
conf.setLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES, TOTAL_MEM_BYTES);
conf.setFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, 0.8f);
conf.setFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT, 0.9f);
TestExceptionReporter reporter = new TestExceptionReporter();
CyclicBarrier mergeStart = new CyclicBarrier(2);
CyclicBarrier mergeComplete = new CyclicBarrier(2);
StubbedMergeManager mgr = new StubbedMergeManager(conf, reporter,
mergeStart, mergeComplete);
// reserve enough map output to cause a merge when it is committed
MapOutput<Text, Text> out1 = mgr.reserve(null, OUTPUT_SIZE, 0);
Assert.assertEquals("Should be a memory merge",
Type.MEMORY, out1.getType());
fillOutput(out1);
MapOutput<Text, Text> out2 = mgr.reserve(null, OUTPUT_SIZE, 0);
Assert.assertEquals("Should be a memory merge",
Type.MEMORY, out2.getType());
fillOutput(out2);
// next reservation should be a WAIT
MapOutput<Text, Text> out3 = mgr.reserve(null, OUTPUT_SIZE, 0);
Assert.assertEquals("Should be told to wait",
Type.WAIT, out3.getType());
// trigger the first merge and wait for merge thread to start merging
// and free enough output to reserve more
out1.commit();
out2.commit();
mergeStart.await();
Assert.assertEquals(1, mgr.getNumMerges());
// reserve enough map output to cause another merge when committed
out1 = mgr.reserve(null, OUTPUT_SIZE, 0);
Assert.assertEquals("Should be a memory merge",
Type.MEMORY, out1.getType());
fillOutput(out1);
out2 = mgr.reserve(null, OUTPUT_SIZE, 0);
Assert.assertEquals("Should be a memory merge",
Type.MEMORY, out2.getType());
fillOutput(out2);
// next reservation should be a WAIT
out3 = mgr.reserve(null, OUTPUT_SIZE, 0);
Assert.assertEquals("Should be told to wait",
Type.WAIT, out3.getType());
// commit output *before* merge thread completes
out1.commit();
out2.commit();
// allow the first merge to complete
mergeComplete.await();
// start the second merge and verify
mergeStart.await();
Assert.assertEquals(2, mgr.getNumMerges());
// trigger the end of the second merge
mergeComplete.await();
Assert.assertEquals(2, mgr.getNumMerges());
Assert.assertEquals("exception reporter invoked",
0, reporter.getNumExceptions());
}
private void fillOutput(MapOutput<Text, Text> output) throws IOException {
BoundedByteArrayOutputStream stream = output.getArrayStream();
int count = stream.getLimit();
for (int i=0; i < count; ++i) {
stream.write(i);
}
}
private static class StubbedMergeManager extends MergeManager<Text, Text> {
private TestMergeThread mergeThread;
public StubbedMergeManager(JobConf conf, ExceptionReporter reporter,
CyclicBarrier mergeStart, CyclicBarrier mergeComplete) {
super(null, conf, mock(LocalFileSystem.class), null, null, null, null,
null, null, null, null, reporter, null, mock(MapOutputFile.class));
mergeThread.setSyncBarriers(mergeStart, mergeComplete);
}
@Override
protected MergeThread<MapOutput<Text, Text>, Text, Text> createInMemoryMerger() {
mergeThread = new TestMergeThread(this, getExceptionReporter());
return mergeThread;
}
public int getNumMerges() {
return mergeThread.getNumMerges();
}
}
private static class TestMergeThread
extends MergeThread<MapOutput<Text,Text>, Text, Text> {
private AtomicInteger numMerges;
private CyclicBarrier mergeStart;
private CyclicBarrier mergeComplete;
public TestMergeThread(MergeManager<Text, Text> mergeManager,
ExceptionReporter reporter) {
super(mergeManager, Integer.MAX_VALUE, reporter);
numMerges = new AtomicInteger(0);
}
public synchronized void setSyncBarriers(
CyclicBarrier mergeStart, CyclicBarrier mergeComplete) {
this.mergeStart = mergeStart;
this.mergeComplete = mergeComplete;
}
public int getNumMerges() {
return numMerges.get();
}
@Override
public void merge(List<MapOutput<Text, Text>> inputs)
throws IOException {
synchronized (this) {
numMerges.incrementAndGet();
for (MapOutput<Text, Text> input : inputs) {
manager.unreserve(input.getSize());
}
}
try {
mergeStart.await();
mergeComplete.await();
} catch (InterruptedException e) {
} catch (BrokenBarrierException e) {
}
}
}
private static class TestExceptionReporter implements ExceptionReporter {
private List<Throwable> exceptions = new ArrayList<Throwable>();
@Override
public void reportException(Throwable t) {
exceptions.add(t);
t.printStackTrace();
}
public int getNumExceptions() {
return exceptions.size();
}
}
}