From 4ea42b84b7b3adaa0be5e857cc5f996d5d8a98bf Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Mon, 4 May 2015 19:02:39 +0000 Subject: [PATCH] MAPREDUCE-5649. Reduce cannot use more than 2G memory for the final merge. Contributed by Gera Shegalov (cherry picked from commit 7dc3c1203d1ab14c09d0aaf0869a5bcdfafb0a5a) (cherry picked from commit 87c2d915f1cc799cb4020c945c04d3ecb82ee963) --- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../task/reduce/MergeManagerImpl.java | 47 ++++++++++--------- .../task/reduce/TestMergeManager.java | 29 ++++++++++++ 3 files changed, 57 insertions(+), 22 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index efa0f91256e..63b6129e770 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -27,6 +27,9 @@ Release 2.7.2 - UNRELEASED MAPREDUCE-6474. ShuffleHandler can possibly exhaust nodemanager file descriptors (Kuhu Shukla via jlowe) + MAPREDUCE-5649. Reduce cannot use more than 2G memory for the final merge + (Gera Shegalov via jlowe) + Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java index a4b1aa82e8f..3699ddd4bb9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java @@ -93,8 +93,10 @@ public class MergeManagerImpl implements MergeManager { Set onDiskMapOutputs = new TreeSet(); private final OnDiskMerger onDiskMerger; - - private final long memoryLimit; + + @VisibleForTesting + final long memoryLimit; + private long usedMemory; private long commitMemory; private final long maxSingleShuffleLimit; @@ -167,11 +169,10 @@ public class MergeManagerImpl implements MergeManager { } // Allow unit tests to fix Runtime memory - this.memoryLimit = - (long)(jobConf.getLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES, - Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE)) - * maxInMemCopyUse); - + this.memoryLimit = (long)(jobConf.getLong( + MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES, + Runtime.getRuntime().maxMemory()) * maxInMemCopyUse); + this.ioSortFactor = jobConf.getInt(MRJobConfig.IO_SORT_FACTOR, 100); final float singleShuffleMemoryLimitPercent = @@ -201,7 +202,7 @@ public class MergeManagerImpl implements MergeManager { if (this.maxSingleShuffleLimit >= this.mergeThreshold) { throw new RuntimeException("Invalid configuration: " - + "maxSingleShuffleLimit should be less than mergeThreshold" + + "maxSingleShuffleLimit should be less than mergeThreshold " + "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit + "mergeThreshold: " + this.mergeThreshold); } @@ -667,24 +668,26 @@ public class MergeManagerImpl implements MergeManager { } } + @VisibleForTesting + final long getMaxInMemReduceLimit() { + final float maxRedPer = + jobConf.getFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 0f); + if (maxRedPer > 1.0 || maxRedPer < 0.0) { + throw new RuntimeException(maxRedPer + ": " + + MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT + + " must be a float between 0 and 1.0"); + } + return (long)(memoryLimit * maxRedPer); + } + private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs, List> inMemoryMapOutputs, List onDiskMapOutputs ) throws IOException { - LOG.info("finalMerge called with " + - inMemoryMapOutputs.size() + " in-memory map-outputs and " + - onDiskMapOutputs.size() + " on-disk map-outputs"); - - final float maxRedPer = - job.getFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 0f); - if (maxRedPer > 1.0 || maxRedPer < 0.0) { - throw new IOException(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT + - maxRedPer); - } - int maxInMemReduce = (int)Math.min( - Runtime.getRuntime().maxMemory() * maxRedPer, Integer.MAX_VALUE); - - + LOG.info("finalMerge called with " + + inMemoryMapOutputs.size() + " in-memory map-outputs and " + + onDiskMapOutputs.size() + " on-disk map-outputs"); + final long maxInMemReduce = getMaxInMemReduceLimit(); // merge config params Class keyClass = (Class)job.getMapOutputKeyClass(); Class valueClass = (Class)job.getMapOutputValueClass(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java index 8d6bab92738..ef860afaf85 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java @@ -260,4 +260,33 @@ public class TestMergeManager { } } + + @Test + public void testLargeMemoryLimits() throws Exception { + final JobConf conf = new JobConf(); + // Xmx in production + conf.setLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES, + 8L * 1024 * 1024 * 1024); + + // M1 = Xmx fraction for map outputs + conf.setFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, 1.0f); + + // M2 = max M1 fraction for a single maple output + conf.setFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, 0.95f); + + // M3 = M1 fraction at which in memory merge is triggered + conf.setFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT, 1.0f); + + // M4 = M1 fraction of map outputs remaining in memory for a reduce + conf.setFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 1.0f); + + final MergeManagerImpl mgr = new MergeManagerImpl( + null, conf, mock(LocalFileSystem.class), null, null, null, null, null, + null, null, null, null, null, new MROutputFiles()); + assertTrue("Large shuffle area unusable: " + mgr.memoryLimit, + mgr.memoryLimit > Integer.MAX_VALUE); + final long maxInMemReduce = mgr.getMaxInMemReduceLimit(); + assertTrue("Large in-memory reduce area unusable: " + maxInMemReduce, + maxInMemReduce > Integer.MAX_VALUE); + } }