MAPREDUCE-5649. Reduce cannot use more than 2G memory for the final merge. Contributed by Gera Shegalov

(cherry picked from commit 7dc3c1203d)

(cherry picked from commit 87c2d915f1)
This commit is contained in:
Jason Lowe 2015-05-04 19:02:39 +00:00 committed by Vinod Kumar Vavilapalli (I am also known as @tshooter.)
parent afa8189a07
commit 4ea42b84b7
3 changed files with 57 additions and 22 deletions

View File

@ -27,6 +27,9 @@ Release 2.7.2 - UNRELEASED
MAPREDUCE-6474. ShuffleHandler can possibly exhaust nodemanager file MAPREDUCE-6474. ShuffleHandler can possibly exhaust nodemanager file
descriptors (Kuhu Shukla via jlowe) descriptors (Kuhu Shukla via jlowe)
MAPREDUCE-5649. Reduce cannot use more than 2G memory for the final merge
(Gera Shegalov via jlowe)
Release 2.7.1 - 2015-07-06 Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -94,7 +94,9 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
Set<CompressAwarePath> onDiskMapOutputs = new TreeSet<CompressAwarePath>(); Set<CompressAwarePath> onDiskMapOutputs = new TreeSet<CompressAwarePath>();
private final OnDiskMerger onDiskMerger; private final OnDiskMerger onDiskMerger;
private final long memoryLimit; @VisibleForTesting
final long memoryLimit;
private long usedMemory; private long usedMemory;
private long commitMemory; private long commitMemory;
private final long maxSingleShuffleLimit; private final long maxSingleShuffleLimit;
@ -167,10 +169,9 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
} }
// Allow unit tests to fix Runtime memory // Allow unit tests to fix Runtime memory
this.memoryLimit = this.memoryLimit = (long)(jobConf.getLong(
(long)(jobConf.getLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES, MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES,
Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE)) Runtime.getRuntime().maxMemory()) * maxInMemCopyUse);
* maxInMemCopyUse);
this.ioSortFactor = jobConf.getInt(MRJobConfig.IO_SORT_FACTOR, 100); this.ioSortFactor = jobConf.getInt(MRJobConfig.IO_SORT_FACTOR, 100);
@ -201,7 +202,7 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
if (this.maxSingleShuffleLimit >= this.mergeThreshold) { if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
throw new RuntimeException("Invalid configuration: " throw new RuntimeException("Invalid configuration: "
+ "maxSingleShuffleLimit should be less than mergeThreshold" + "maxSingleShuffleLimit should be less than mergeThreshold "
+ "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit + "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit
+ "mergeThreshold: " + this.mergeThreshold); + "mergeThreshold: " + this.mergeThreshold);
} }
@ -667,6 +668,18 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
} }
} }
@VisibleForTesting
final long getMaxInMemReduceLimit() {
final float maxRedPer =
jobConf.getFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 0f);
if (maxRedPer > 1.0 || maxRedPer < 0.0) {
throw new RuntimeException(maxRedPer + ": "
+ MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT
+ " must be a float between 0 and 1.0");
}
return (long)(memoryLimit * maxRedPer);
}
private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs, private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
List<InMemoryMapOutput<K,V>> inMemoryMapOutputs, List<InMemoryMapOutput<K,V>> inMemoryMapOutputs,
List<CompressAwarePath> onDiskMapOutputs List<CompressAwarePath> onDiskMapOutputs
@ -674,17 +687,7 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
LOG.info("finalMerge called with " + LOG.info("finalMerge called with " +
inMemoryMapOutputs.size() + " in-memory map-outputs and " + inMemoryMapOutputs.size() + " in-memory map-outputs and " +
onDiskMapOutputs.size() + " on-disk map-outputs"); onDiskMapOutputs.size() + " on-disk map-outputs");
final long maxInMemReduce = getMaxInMemReduceLimit();
final float maxRedPer =
job.getFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 0f);
if (maxRedPer > 1.0 || maxRedPer < 0.0) {
throw new IOException(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT +
maxRedPer);
}
int maxInMemReduce = (int)Math.min(
Runtime.getRuntime().maxMemory() * maxRedPer, Integer.MAX_VALUE);
// merge config params // merge config params
Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass(); Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
Class<V> valueClass = (Class<V>)job.getMapOutputValueClass(); Class<V> valueClass = (Class<V>)job.getMapOutputValueClass();

View File

@ -260,4 +260,33 @@ public class TestMergeManager {
} }
} }
@Test
public void testLargeMemoryLimits() throws Exception {
final JobConf conf = new JobConf();
// Xmx in production
conf.setLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES,
8L * 1024 * 1024 * 1024);
// M1 = Xmx fraction for map outputs
conf.setFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, 1.0f);
// M2 = max M1 fraction for a single maple output
conf.setFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, 0.95f);
// M3 = M1 fraction at which in memory merge is triggered
conf.setFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT, 1.0f);
// M4 = M1 fraction of map outputs remaining in memory for a reduce
conf.setFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 1.0f);
final MergeManagerImpl<Text, Text> mgr = new MergeManagerImpl<Text, Text>(
null, conf, mock(LocalFileSystem.class), null, null, null, null, null,
null, null, null, null, null, new MROutputFiles());
assertTrue("Large shuffle area unusable: " + mgr.memoryLimit,
mgr.memoryLimit > Integer.MAX_VALUE);
final long maxInMemReduce = mgr.getMaxInMemReduceLimit();
assertTrue("Large in-memory reduce area unusable: " + maxInMemReduce,
maxInMemReduce > Integer.MAX_VALUE);
}
} }