MAPREDUCE-3756. Made single shuffle limit configurable. Contributed by Hitesh Shah.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1238721 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2012-01-31 18:08:11 +00:00
parent 8dfef7d2dc
commit a8b6ee0491
6 changed files with 30 additions and 7 deletions

View File

@ -246,6 +246,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3360. Added information about lost/rebooted/decommissioned nodes
on the webapps. (Bhallamudi Venkata Siva Kamesh and Jason Lowe via vinodkv)
MAPREDUCE-3756. Made single shuffle limit configurable. (Hitesh Shah via
acmurthy)
BUG FIXES
MAPREDUCE-3221. Reenabled the previously ignored test in TestSubmitJob

View File

@ -228,7 +228,10 @@ public interface MRJobConfig {
public static final String SHUFFLE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.shuffle.input.buffer.percent";
public static final String SHUFFLE_MERGE_EPRCENT = "mapreduce.reduce.shuffle.merge.percent";
public static final String SHUFFLE_MEMORY_LIMIT_PERCENT
= "mapreduce.reduce.shuffle.memory.limit.percent";
public static final String SHUFFLE_MERGE_PERCENT = "mapreduce.reduce.shuffle.merge.percent";
public static final String REDUCE_FAILURES_MAXPERCENT = "mapreduce.reduce.failures.maxpercent";

View File

@ -46,7 +46,6 @@ import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.IFile.Reader;
import org.apache.hadoop.mapred.IFile.Writer;
import org.apache.hadoop.mapred.Merger.Segment;
@ -68,7 +67,8 @@ public class MergeManager<K, V> {
/* Maximum percentage of the in-memory limit that a single shuffle can
* consume*/
private static final float MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION = 0.25f;
private static final float DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT
= 0.25f;
private final TaskAttemptID reduceId;
@ -169,12 +169,22 @@ public class MergeManager<K, V> {
this.ioSortFactor = jobConf.getInt(MRJobConfig.IO_SORT_FACTOR, 100);
final float singleShuffleMemoryLimitPercent =
jobConf.getFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT,
DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT);
if (singleShuffleMemoryLimitPercent <= 0.0f
|| singleShuffleMemoryLimitPercent > 1.0f) {
throw new IllegalArgumentException("Invalid value for "
+ MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
+ singleShuffleMemoryLimitPercent);
}
this.maxSingleShuffleLimit =
(long)(memoryLimit * MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION);
(long)(memoryLimit * singleShuffleMemoryLimitPercent);
this.memToMemMergeOutputsThreshold =
jobConf.getInt(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, ioSortFactor);
this.mergeThreshold = (long)(this.memoryLimit *
jobConf.getFloat(MRJobConfig.SHUFFLE_MERGE_EPRCENT,
jobConf.getFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT,
0.90f));
LOG.info("MergerManager: memoryLimit=" + memoryLimit + ", " +
"maxSingleShuffleLimit=" + maxSingleShuffleLimit + ", " +

View File

@ -355,7 +355,7 @@ public class ConfigUtil {
Configuration.addDeprecation("mapred.job.shuffle.input.buffer.percent",
new String[] {MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT});
Configuration.addDeprecation("mapred.job.shuffle.merge.percent",
new String[] {MRJobConfig.SHUFFLE_MERGE_EPRCENT});
new String[] {MRJobConfig.SHUFFLE_MERGE_PERCENT});
Configuration.addDeprecation("mapred.max.reduce.failures.percent",
new String[] {MRJobConfig.REDUCE_FAILURES_MAXPERCENT});
Configuration.addDeprecation("mapred.reduce.child.env",

View File

@ -517,6 +517,13 @@
</description>
</property>
<property>
<name>mapreduce.reduce.shuffle.memory.limit.percent</name>
<value>0.25</value>
<description>Expert: Maximum percentage of the in-memory limit that a
single shuffle can consume</description>
</property>
<property>
<name>mapreduce.reduce.markreset.buffer.percent</name>
<value>0.0</value>

View File

@ -89,7 +89,7 @@ public class TestReduceFetchFromPartialMem extends TestCase {
job.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, "-Xmx128m");
job.setLong(JobContext.REDUCE_MEMORY_TOTAL_BYTES, 128 << 20);
job.set(JobContext.SHUFFLE_INPUT_BUFFER_PERCENT, "0.14");
job.set(JobContext.SHUFFLE_MERGE_EPRCENT, "1.0");
job.set(JobContext.SHUFFLE_MERGE_PERCENT, "1.0");
Counters c = runJob(job);
final long out = c.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter();
final long spill = c.findCounter(TaskCounter.SPILLED_RECORDS).getCounter();