diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java index c99a330478a..ec4cb9d93cc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java @@ -99,7 +99,9 @@ public class MergeManagerImpl implements MergeManager { private long usedMemory; private long commitMemory; - private final long maxSingleShuffleLimit; + + @VisibleForTesting + final long maxSingleShuffleLimit; private final int memToMemMergeOutputsThreshold; private final long mergeThreshold; @@ -187,10 +189,16 @@ public class MergeManagerImpl implements MergeManager { usedMemory = 0L; commitMemory = 0L; - this.maxSingleShuffleLimit = - (long)(memoryLimit * singleShuffleMemoryLimitPercent); - this.memToMemMergeOutputsThreshold = - jobConf.getInt(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, ioSortFactor); + long maxSingleShuffleLimitConfiged = + (long)(memoryLimit * singleShuffleMemoryLimitPercent); + if(maxSingleShuffleLimitConfiged > Integer.MAX_VALUE) { + maxSingleShuffleLimitConfiged = Integer.MAX_VALUE; + LOG.info("The max number of bytes for a single in-memory shuffle cannot" + + " be larger than Integer.MAX_VALUE. Setting it to Integer.MAX_VALUE"); + } + this.maxSingleShuffleLimit = maxSingleShuffleLimitConfiged; + this.memToMemMergeOutputsThreshold = + jobConf.getInt(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, ioSortFactor); this.mergeThreshold = (long)(this.memoryLimit * jobConf.getFloat( MRJobConfig.SHUFFLE_MERGE_PERCENT, @@ -249,17 +257,13 @@ public class MergeManagerImpl implements MergeManager { public void waitForResource() throws InterruptedException { inMemoryMerger.waitForMerge(); } - - private boolean canShuffleToMemory(long requestedSize) { - return (requestedSize < maxSingleShuffleLimit); - } - + @Override public synchronized MapOutput reserve(TaskAttemptID mapId, long requestedSize, int fetcher ) throws IOException { - if (!canShuffleToMemory(requestedSize)) { + if (requestedSize > maxSingleShuffleLimit) { LOG.info(mapId + ": Shuffling to disk since " + requestedSize + " is greater than maxSingleShuffleLimit (" + maxSingleShuffleLimit + ")"); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java index ef860afaf85..f164b928559 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java @@ -41,6 +41,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MROutputFiles; import org.apache.hadoop.mapred.MapOutputFile; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath; import org.junit.Assert; import org.junit.Test; @@ -288,5 +289,18 @@ public class TestMergeManager { final long maxInMemReduce = mgr.getMaxInMemReduceLimit(); assertTrue("Large in-memory reduce area unusable: " + maxInMemReduce, maxInMemReduce > Integer.MAX_VALUE); + assertEquals("maxSingleShuffleLimit to be capped at Integer.MAX_VALUE", + Integer.MAX_VALUE, mgr.maxSingleShuffleLimit); + verifyReservedMapOutputType(mgr, 10L, "MEMORY"); + verifyReservedMapOutputType(mgr, 1L + Integer.MAX_VALUE, "DISK"); + } + + private void verifyReservedMapOutputType(MergeManagerImpl mgr, + long size, String expectedShuffleMode) throws IOException { + final TaskAttemptID mapId = TaskAttemptID.forName("attempt_0_1_m_1_1"); + final MapOutput mapOutput = mgr.reserve(mapId, size, 1); + assertEquals("Shuffled bytes: " + size, expectedShuffleMode, + mapOutput.getDescription()); + mgr.unreserve(size); } }