MAPREDUCE-6721. mapreduce.reduce.shuffle.memory.limit.percent=0.0 should be legal to enforce shuffle to disk. (Gera Shegalov via ozawa)

This closes #102

(cherry picked from commit 79a7289165)
This commit is contained in:
Tsuyoshi Ozawa 2016-06-22 17:13:50 -07:00
parent 856bc4e28b
commit 3ecbfdfdc2
3 changed files with 21 additions and 2 deletions

View File

@ -178,7 +178,7 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
final float singleShuffleMemoryLimitPercent =
jobConf.getFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT,
DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT);
if (singleShuffleMemoryLimitPercent <= 0.0f
if (singleShuffleMemoryLimitPercent < 0.0f
|| singleShuffleMemoryLimitPercent > 1.0f) {
throw new IllegalArgumentException("Invalid value for "
+ MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT + ": "

View File

@ -443,7 +443,8 @@
<name>mapreduce.reduce.shuffle.memory.limit.percent</name>
<value>0.25</value>
<description>Expert: Maximum percentage of the in-memory limit that a
single shuffle can consume</description>
single shuffle can consume. Range of valid values is [0.0, 1.0]. If the value
is 0.0 map outputs are shuffled directly to disk.</description>
</property>
<property>

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MROutputFiles;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
import org.junit.Assert;
import org.junit.Test;
@ -289,4 +290,21 @@ public class TestMergeManager {
assertTrue("Large in-memory reduce area unusable: " + maxInMemReduce,
maxInMemReduce > Integer.MAX_VALUE);
}
@Test
public void testZeroShuffleMemoryLimitPercent() throws Exception {
final JobConf jobConf = new JobConf();
jobConf.setFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, 0.0f);
final MergeManager<Text, Text> mgr =
new MergeManagerImpl<>(null, jobConf, mock(LocalFileSystem.class),
null, null, null, null, null, null, null, null, null, null,
new MROutputFiles());
final long mapOutputSize = 10;
final int fetcher = 1;
final MapOutput<Text, Text> mapOutput = mgr.reserve(
TaskAttemptID.forName("attempt_0_1_m_1_1"),
mapOutputSize, fetcher);
assertEquals("Tiny map outputs should be shuffled to disk", "DISK",
mapOutput.getDescription());
}
}