diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 852617beb70..6ac67851092 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -523,6 +523,9 @@ Release 0.23.1 - Unreleased CapacityScheduler so that it deducts current-usage per user and not per-application. (Arun C Murthy via vinodkv) + MAPREDUCE-3721. Fixed a race in shuffle which caused reduces to hang. + (sseth via acmurthy) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java index edf332ac975..82642b181c8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java @@ -92,6 +92,7 @@ public class MergeManager { private final long memoryLimit; private long usedMemory; + private long commitMemory; private final long maxSingleShuffleLimit; private final int memToMemMergeOutputsThreshold; @@ -181,6 +182,13 @@ public class MergeManager { "ioSortFactor=" + ioSortFactor + ", " + "memToMemMergeOutputsThreshold=" + memToMemMergeOutputsThreshold); + if (this.maxSingleShuffleLimit >= this.mergeThreshold) { + throw new RuntimeException("Invlaid configuration: " + + "maxSingleShuffleLimit should be less than mergeThreshold" + + "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit + + "mergeThreshold: " + this.mergeThreshold); + } + boolean allowMemToMemMerge = jobConf.getBoolean(MRJobConfig.REDUCE_MEMTOMEM_ENABLED, false); if (allowMemToMemMerge) { @@ -245,16 +253,16 @@ public class MergeManager { // all the stalled threads if (usedMemory > memoryLimit) { - LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + usedMemory + - ") is greater than memoryLimit (" + memoryLimit + ")"); - + LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + usedMemory + + ") is greater than memoryLimit (" + memoryLimit + ")." + + " CommitMemory is (" + commitMemory + ")"); return stallShuffle; } // Allow the in-memory shuffle to progress - LOG.debug(mapId + ": Proceeding with shuffle since usedMemory (" + - usedMemory + - ") is lesser than memoryLimit (" + memoryLimit + ")"); + LOG.debug(mapId + ": Proceeding with shuffle since usedMemory (" + + usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")." + + "CommitMemory is (" + commitMemory + ")"); return unconditionalReserve(mapId, requestedSize, true); } @@ -270,18 +278,24 @@ public class MergeManager { } synchronized void unreserve(long size) { + commitMemory -= size; usedMemory -= size; } - + public synchronized void closeInMemoryFile(MapOutput mapOutput) { inMemoryMapOutputs.add(mapOutput); LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize() - + ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size()); - + + ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size() + + ", commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory); + + commitMemory+= mapOutput.getSize(); + synchronized (inMemoryMerger) { - if (!inMemoryMerger.isInProgress() && usedMemory >= mergeThreshold) { - LOG.info("Starting inMemoryMerger's merge since usedMemory=" + - usedMemory + " > mergeThreshold=" + mergeThreshold); + // Can hang if mergeThreshold is really low. + if (!inMemoryMerger.isInProgress() && commitMemory >= mergeThreshold) { + LOG.info("Starting inMemoryMerger's merge since commitMemory=" + + commitMemory + " > mergeThreshold=" + mergeThreshold + + ". Current usedMemory=" + usedMemory); inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs); inMemoryMergedMapOutputs.clear(); inMemoryMerger.startMerge(inMemoryMapOutputs);