MAPREDUCE-5649. Reduce cannot use more than 2G memory for the final merge. Contributed by Gera Shegalov

(cherry picked from commit 7dc3c1203d)

(cherry picked from commit 87c2d915f1)
(cherry picked from commit 1da62ba736f5f161a18a52b7ca0d212786f3848c)
This commit is contained in:
Jason Lowe 2015-05-04 19:02:39 +00:00 committed by Vinod Kumar Vavilapalli
parent c90f025e90
commit 41601eae6b
3 changed files with 57 additions and 22 deletions

View File

@ -35,6 +35,9 @@ Release 2.6.1 - UNRELEASED
MAPREDUCE-6324. Fixed MapReduce uber jobs to not fail the udpate of AM-RM
tokens when they roll-over. (Jason Lowe via vinodkv)
MAPREDUCE-5649. Reduce cannot use more than 2G memory for the final merge
(Gera Shegalov via jlowe)
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES

View File

@ -93,8 +93,10 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
Set<CompressAwarePath> onDiskMapOutputs = new TreeSet<CompressAwarePath>();
private final OnDiskMerger onDiskMerger;
private final long memoryLimit;
@VisibleForTesting
final long memoryLimit;
private long usedMemory;
private long commitMemory;
private final long maxSingleShuffleLimit;
@ -167,11 +169,10 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
}
// Allow unit tests to fix Runtime memory
this.memoryLimit =
(long)(jobConf.getLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES,
Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))
* maxInMemCopyUse);
this.memoryLimit = (long)(jobConf.getLong(
MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES,
Runtime.getRuntime().maxMemory()) * maxInMemCopyUse);
this.ioSortFactor = jobConf.getInt(MRJobConfig.IO_SORT_FACTOR, 100);
final float singleShuffleMemoryLimitPercent =
@ -201,7 +202,7 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
throw new RuntimeException("Invalid configuration: "
+ "maxSingleShuffleLimit should be less than mergeThreshold"
+ "maxSingleShuffleLimit should be less than mergeThreshold "
+ "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit
+ "mergeThreshold: " + this.mergeThreshold);
}
@ -667,24 +668,26 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
}
}
@VisibleForTesting
final long getMaxInMemReduceLimit() {
final float maxRedPer =
jobConf.getFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 0f);
if (maxRedPer > 1.0 || maxRedPer < 0.0) {
throw new RuntimeException(maxRedPer + ": "
+ MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT
+ " must be a float between 0 and 1.0");
}
return (long)(memoryLimit * maxRedPer);
}
private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
List<InMemoryMapOutput<K,V>> inMemoryMapOutputs,
List<CompressAwarePath> onDiskMapOutputs
) throws IOException {
LOG.info("finalMerge called with " +
inMemoryMapOutputs.size() + " in-memory map-outputs and " +
onDiskMapOutputs.size() + " on-disk map-outputs");
final float maxRedPer =
job.getFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 0f);
if (maxRedPer > 1.0 || maxRedPer < 0.0) {
throw new IOException(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT +
maxRedPer);
}
int maxInMemReduce = (int)Math.min(
Runtime.getRuntime().maxMemory() * maxRedPer, Integer.MAX_VALUE);
LOG.info("finalMerge called with " +
inMemoryMapOutputs.size() + " in-memory map-outputs and " +
onDiskMapOutputs.size() + " on-disk map-outputs");
final long maxInMemReduce = getMaxInMemReduceLimit();
// merge config params
Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
Class<V> valueClass = (Class<V>)job.getMapOutputValueClass();

View File

@ -260,4 +260,33 @@ public class TestMergeManager {
}
}
@Test
public void testLargeMemoryLimits() throws Exception {
final JobConf conf = new JobConf();
// Xmx in production
conf.setLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES,
8L * 1024 * 1024 * 1024);
// M1 = Xmx fraction for map outputs
conf.setFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, 1.0f);
// M2 = max M1 fraction for a single maple output
conf.setFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, 0.95f);
// M3 = M1 fraction at which in memory merge is triggered
conf.setFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT, 1.0f);
// M4 = M1 fraction of map outputs remaining in memory for a reduce
conf.setFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 1.0f);
final MergeManagerImpl<Text, Text> mgr = new MergeManagerImpl<Text, Text>(
null, conf, mock(LocalFileSystem.class), null, null, null, null, null,
null, null, null, null, null, new MROutputFiles());
assertTrue("Large shuffle area unusable: " + mgr.memoryLimit,
mgr.memoryLimit > Integer.MAX_VALUE);
final long maxInMemReduce = mgr.getMaxInMemReduceLimit();
assertTrue("Large in-memory reduce area unusable: " + maxInMemReduce,
maxInMemReduce > Integer.MAX_VALUE);
}
}