From 891e616d5110cd350bf4b16098305e7c09f09748 Mon Sep 17 00:00:00 2001 From: Jason Darrell Lowe Date: Fri, 21 Dec 2012 18:32:57 +0000 Subject: [PATCH] svn merge -c 1425071 FIXES: MAPREDUCE-4842. Shuffle race can hang reducer. Contributed by Mariappan Asokan git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1425074 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 2 + .../mapreduce/task/reduce/MergeManager.java | 52 ++--- .../mapreduce/task/reduce/MergeThread.java | 43 ++-- .../task/reduce/TestMergeManager.java | 196 ++++++++++++++++++ 4 files changed, 250 insertions(+), 43 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 840057e254d..b8df9a9f0a2 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -473,6 +473,8 @@ Release 0.23.6 - UNRELEASED MAPREDUCE-4836. Elapsed time for running tasks on AM web UI tasks page is 0 (Ravi Prakash via jeagles) + MAPREDUCE-4842. Shuffle race can hang reducer (Mariappan Asokan via jlowe) + Release 0.23.5 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java index 29503ceb814..e658be47c95 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java @@ -58,7 +58,9 @@ import org.apache.hadoop.mapreduce.task.reduce.MapOutput.MapOutputComparator; import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.ReflectionUtils; -@SuppressWarnings(value={"unchecked", "deprecation"}) +import com.google.common.annotations.VisibleForTesting; + +@SuppressWarnings(value={"unchecked"}) @InterfaceAudience.Private @InterfaceStability.Unstable public class MergeManager { @@ -85,7 +87,7 @@ public class MergeManager { Set> inMemoryMapOutputs = new TreeSet>(new MapOutputComparator()); - private final InMemoryMerger inMemoryMerger; + private final MergeThread, K,V> inMemoryMerger; Set onDiskMapOutputs = new TreeSet(); private final OnDiskMerger onDiskMerger; @@ -179,6 +181,8 @@ public class MergeManager { + singleShuffleMemoryLimitPercent); } + usedMemory = 0L; + commitMemory = 0L; this.maxSingleShuffleLimit = (long)(memoryLimit * singleShuffleMemoryLimitPercent); this.memToMemMergeOutputsThreshold = @@ -210,7 +214,7 @@ public class MergeManager { this.memToMemMerger = null; } - this.inMemoryMerger = new InMemoryMerger(this); + this.inMemoryMerger = createInMemoryMerger(); this.inMemoryMerger.start(); this.onDiskMerger = new OnDiskMerger(this); @@ -219,11 +223,19 @@ public class MergeManager { this.mergePhase = mergePhase; } + protected MergeThread, K,V> createInMemoryMerger() { + return new InMemoryMerger(this); + } TaskAttemptID getReduceId() { return reduceId; } + @VisibleForTesting + ExceptionReporter getExceptionReporter() { + return exceptionReporter; + } + public void waitForInMemoryMerge() throws InterruptedException { inMemoryMerger.waitForMerge(); } @@ -288,7 +300,6 @@ public class MergeManager { } synchronized void unreserve(long size) { - commitMemory -= size; usedMemory -= size; } @@ -300,24 +311,20 @@ public class MergeManager { commitMemory+= mapOutput.getSize(); - synchronized (inMemoryMerger) { - // Can hang if mergeThreshold is really low. - if (!inMemoryMerger.isInProgress() && commitMemory >= mergeThreshold) { - LOG.info("Starting inMemoryMerger's merge since commitMemory=" + - commitMemory + " > mergeThreshold=" + mergeThreshold + - ". Current usedMemory=" + usedMemory); - inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs); - inMemoryMergedMapOutputs.clear(); - inMemoryMerger.startMerge(inMemoryMapOutputs); - } + // Can hang if mergeThreshold is really low. + if (commitMemory >= mergeThreshold) { + LOG.info("Starting inMemoryMerger's merge since commitMemory=" + + commitMemory + " > mergeThreshold=" + mergeThreshold + + ". Current usedMemory=" + usedMemory); + inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs); + inMemoryMergedMapOutputs.clear(); + inMemoryMerger.startMerge(inMemoryMapOutputs); + commitMemory = 0L; // Reset commitMemory. } if (memToMemMerger != null) { - synchronized (memToMemMerger) { - if (!memToMemMerger.isInProgress() && - inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) { - memToMemMerger.startMerge(inMemoryMapOutputs); - } + if (inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) { + memToMemMerger.startMerge(inMemoryMapOutputs); } } } @@ -333,11 +340,8 @@ public class MergeManager { public synchronized void closeOnDiskFile(Path file) { onDiskMapOutputs.add(file); - synchronized (onDiskMerger) { - if (!onDiskMerger.isInProgress() && - onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) { - onDiskMerger.startMerge(onDiskMapOutputs); - } + if (onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) { + onDiskMerger.startMerge(onDiskMapOutputs); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java index f5d89a3efc7..568f4e6ffec 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java @@ -20,8 +20,10 @@ package org.apache.hadoop.mapreduce.task.reduce; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -30,8 +32,8 @@ abstract class MergeThread extends Thread { private static final Log LOG = LogFactory.getLog(MergeThread.class); - private volatile boolean inProgress = false; - private List inputs = new ArrayList(); + private AtomicInteger numPending = new AtomicInteger(0); + private LinkedList> pendingToBeMerged; protected final MergeManager manager; private final ExceptionReporter reporter; private boolean closed = false; @@ -39,6 +41,7 @@ abstract class MergeThread extends Thread { public MergeThread(MergeManager manager, int mergeFactor, ExceptionReporter reporter) { + this.pendingToBeMerged = new LinkedList>(); this.manager = manager; this.mergeFactor = mergeFactor; this.reporter = reporter; @@ -50,53 +53,55 @@ abstract class MergeThread extends Thread { interrupt(); } - public synchronized boolean isInProgress() { - return inProgress; - } - - public synchronized void startMerge(Set inputs) { + public void startMerge(Set inputs) { if (!closed) { - inProgress = true; - this.inputs = new ArrayList(); + numPending.incrementAndGet(); + List toMergeInputs = new ArrayList(); Iterator iter=inputs.iterator(); for (int ctr = 0; iter.hasNext() && ctr < mergeFactor; ++ctr) { - this.inputs.add(iter.next()); + toMergeInputs.add(iter.next()); iter.remove(); } - LOG.info(getName() + ": Starting merge with " + this.inputs.size() + + LOG.info(getName() + ": Starting merge with " + toMergeInputs.size() + " segments, while ignoring " + inputs.size() + " segments"); - notifyAll(); + synchronized(pendingToBeMerged) { + pendingToBeMerged.addLast(toMergeInputs); + pendingToBeMerged.notifyAll(); + } } } public synchronized void waitForMerge() throws InterruptedException { - while (inProgress) { + while (numPending.get() > 0) { wait(); } } public void run() { while (true) { + List inputs = null; try { // Wait for notification to start the merge... - synchronized (this) { - while (!inProgress) { - wait(); + synchronized (pendingToBeMerged) { + while(pendingToBeMerged.size() <= 0) { + pendingToBeMerged.wait(); } + // Pickup the inputs to merge. + inputs = pendingToBeMerged.removeFirst(); } // Merge merge(inputs); } catch (InterruptedException ie) { + numPending.set(0); return; } catch(Throwable t) { + numPending.set(0); reporter.reportException(t); return; } finally { synchronized (this) { - // Clear inputs - inputs = null; - inProgress = false; + numPending.decrementAndGet(); notifyAll(); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java new file mode 100644 index 00000000000..a8669639b2a --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.task.reduce; + +import static org.mockito.Mockito.mock; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.io.BoundedByteArrayOutputStream; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapOutputFile; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type; +import org.junit.Assert; +import org.junit.Test; + +public class TestMergeManager { + + @Test(timeout=10000) + public void testMemoryMerge() throws Exception { + final int TOTAL_MEM_BYTES = 10000; + final int OUTPUT_SIZE = 7950; + JobConf conf = new JobConf(); + conf.setFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, 1.0f); + conf.setLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES, TOTAL_MEM_BYTES); + conf.setFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, 0.8f); + conf.setFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT, 0.9f); + TestExceptionReporter reporter = new TestExceptionReporter(); + CyclicBarrier mergeStart = new CyclicBarrier(2); + CyclicBarrier mergeComplete = new CyclicBarrier(2); + StubbedMergeManager mgr = new StubbedMergeManager(conf, reporter, + mergeStart, mergeComplete); + + // reserve enough map output to cause a merge when it is committed + MapOutput out1 = mgr.reserve(null, OUTPUT_SIZE, 0); + Assert.assertEquals("Should be a memory merge", + Type.MEMORY, out1.getType()); + fillOutput(out1); + MapOutput out2 = mgr.reserve(null, OUTPUT_SIZE, 0); + Assert.assertEquals("Should be a memory merge", + Type.MEMORY, out2.getType()); + fillOutput(out2); + + // next reservation should be a WAIT + MapOutput out3 = mgr.reserve(null, OUTPUT_SIZE, 0); + Assert.assertEquals("Should be told to wait", + Type.WAIT, out3.getType()); + + // trigger the first merge and wait for merge thread to start merging + // and free enough output to reserve more + out1.commit(); + out2.commit(); + mergeStart.await(); + + Assert.assertEquals(1, mgr.getNumMerges()); + + // reserve enough map output to cause another merge when committed + out1 = mgr.reserve(null, OUTPUT_SIZE, 0); + Assert.assertEquals("Should be a memory merge", + Type.MEMORY, out1.getType()); + fillOutput(out1); + out2 = mgr.reserve(null, OUTPUT_SIZE, 0); + Assert.assertEquals("Should be a memory merge", + Type.MEMORY, out2.getType()); + fillOutput(out2); + + // next reservation should be a WAIT + out3 = mgr.reserve(null, OUTPUT_SIZE, 0); + Assert.assertEquals("Should be told to wait", + Type.WAIT, out3.getType()); + + // commit output *before* merge thread completes + out1.commit(); + out2.commit(); + + // allow the first merge to complete + mergeComplete.await(); + + // start the second merge and verify + mergeStart.await(); + Assert.assertEquals(2, mgr.getNumMerges()); + + // trigger the end of the second merge + mergeComplete.await(); + + Assert.assertEquals(2, mgr.getNumMerges()); + Assert.assertEquals("exception reporter invoked", + 0, reporter.getNumExceptions()); + } + + private void fillOutput(MapOutput output) throws IOException { + BoundedByteArrayOutputStream stream = output.getArrayStream(); + int count = stream.getLimit(); + for (int i=0; i < count; ++i) { + stream.write(i); + } + } + + private static class StubbedMergeManager extends MergeManager { + private TestMergeThread mergeThread; + + public StubbedMergeManager(JobConf conf, ExceptionReporter reporter, + CyclicBarrier mergeStart, CyclicBarrier mergeComplete) { + super(null, conf, mock(LocalFileSystem.class), null, null, null, null, + null, null, null, null, reporter, null, mock(MapOutputFile.class)); + mergeThread.setSyncBarriers(mergeStart, mergeComplete); + } + + @Override + protected MergeThread, Text, Text> createInMemoryMerger() { + mergeThread = new TestMergeThread(this, getExceptionReporter()); + return mergeThread; + } + + public int getNumMerges() { + return mergeThread.getNumMerges(); + } + } + + private static class TestMergeThread + extends MergeThread, Text, Text> { + private AtomicInteger numMerges; + private CyclicBarrier mergeStart; + private CyclicBarrier mergeComplete; + + public TestMergeThread(MergeManager mergeManager, + ExceptionReporter reporter) { + super(mergeManager, Integer.MAX_VALUE, reporter); + numMerges = new AtomicInteger(0); + } + + public synchronized void setSyncBarriers( + CyclicBarrier mergeStart, CyclicBarrier mergeComplete) { + this.mergeStart = mergeStart; + this.mergeComplete = mergeComplete; + } + + public int getNumMerges() { + return numMerges.get(); + } + + @Override + public void merge(List> inputs) + throws IOException { + synchronized (this) { + numMerges.incrementAndGet(); + for (MapOutput input : inputs) { + manager.unreserve(input.getSize()); + } + } + + try { + mergeStart.await(); + mergeComplete.await(); + } catch (InterruptedException e) { + } catch (BrokenBarrierException e) { + } + } + } + + private static class TestExceptionReporter implements ExceptionReporter { + private List exceptions = new ArrayList(); + + @Override + public void reportException(Throwable t) { + exceptions.add(t); + t.printStackTrace(); + } + + public int getNumExceptions() { + return exceptions.size(); + } + } +}