Merge -c 1238721 from trunk to branch-0.23 to fix MAPREDUCE-3756. Made single shuffle limit configurable.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1238723 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2012-01-31 18:09:39 +00:00
parent 2c8258c98c
commit 85520b1bc4
6 changed files with 30 additions and 7 deletions

View File

@ -178,6 +178,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3360. Added information about lost/rebooted/decommissioned nodes MAPREDUCE-3360. Added information about lost/rebooted/decommissioned nodes
on the webapps. (Bhallamudi Venkata Siva Kamesh and Jason Lowe via vinodkv) on the webapps. (Bhallamudi Venkata Siva Kamesh and Jason Lowe via vinodkv)
MAPREDUCE-3756. Made single shuffle limit configurable. (Hitesh Shah via
acmurthy)
BUG FIXES BUG FIXES
MAPREDUCE-2784. [Gridmix] Bug fixes in ExecutionSummarizer and MAPREDUCE-2784. [Gridmix] Bug fixes in ExecutionSummarizer and
ResourceUsageMatcher. (amarrk) ResourceUsageMatcher. (amarrk)

View File

@ -228,7 +228,10 @@ public interface MRJobConfig {
public static final String SHUFFLE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.shuffle.input.buffer.percent"; public static final String SHUFFLE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.shuffle.input.buffer.percent";
public static final String SHUFFLE_MERGE_EPRCENT = "mapreduce.reduce.shuffle.merge.percent"; public static final String SHUFFLE_MEMORY_LIMIT_PERCENT
= "mapreduce.reduce.shuffle.memory.limit.percent";
public static final String SHUFFLE_MERGE_PERCENT = "mapreduce.reduce.shuffle.merge.percent";
public static final String REDUCE_FAILURES_MAXPERCENT = "mapreduce.reduce.failures.maxpercent"; public static final String REDUCE_FAILURES_MAXPERCENT = "mapreduce.reduce.failures.maxpercent";

View File

@ -46,7 +46,6 @@ import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.Task; import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.IFile.Reader; import org.apache.hadoop.mapred.IFile.Reader;
import org.apache.hadoop.mapred.IFile.Writer; import org.apache.hadoop.mapred.IFile.Writer;
import org.apache.hadoop.mapred.Merger.Segment; import org.apache.hadoop.mapred.Merger.Segment;
@ -68,7 +67,8 @@ public class MergeManager<K, V> {
/* Maximum percentage of the in-memory limit that a single shuffle can /* Maximum percentage of the in-memory limit that a single shuffle can
* consume*/ * consume*/
private static final float MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION = 0.25f; private static final float DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT
= 0.25f;
private final TaskAttemptID reduceId; private final TaskAttemptID reduceId;
@ -169,12 +169,22 @@ public class MergeManager<K, V> {
this.ioSortFactor = jobConf.getInt(MRJobConfig.IO_SORT_FACTOR, 100); this.ioSortFactor = jobConf.getInt(MRJobConfig.IO_SORT_FACTOR, 100);
final float singleShuffleMemoryLimitPercent =
jobConf.getFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT,
DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT);
if (singleShuffleMemoryLimitPercent <= 0.0f
|| singleShuffleMemoryLimitPercent > 1.0f) {
throw new IllegalArgumentException("Invalid value for "
+ MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
+ singleShuffleMemoryLimitPercent);
}
this.maxSingleShuffleLimit = this.maxSingleShuffleLimit =
(long)(memoryLimit * MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION); (long)(memoryLimit * singleShuffleMemoryLimitPercent);
this.memToMemMergeOutputsThreshold = this.memToMemMergeOutputsThreshold =
jobConf.getInt(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, ioSortFactor); jobConf.getInt(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, ioSortFactor);
this.mergeThreshold = (long)(this.memoryLimit * this.mergeThreshold = (long)(this.memoryLimit *
jobConf.getFloat(MRJobConfig.SHUFFLE_MERGE_EPRCENT, jobConf.getFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT,
0.90f)); 0.90f));
LOG.info("MergerManager: memoryLimit=" + memoryLimit + ", " + LOG.info("MergerManager: memoryLimit=" + memoryLimit + ", " +
"maxSingleShuffleLimit=" + maxSingleShuffleLimit + ", " + "maxSingleShuffleLimit=" + maxSingleShuffleLimit + ", " +

View File

@ -355,7 +355,7 @@ public class ConfigUtil {
Configuration.addDeprecation("mapred.job.shuffle.input.buffer.percent", Configuration.addDeprecation("mapred.job.shuffle.input.buffer.percent",
new String[] {MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT}); new String[] {MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT});
Configuration.addDeprecation("mapred.job.shuffle.merge.percent", Configuration.addDeprecation("mapred.job.shuffle.merge.percent",
new String[] {MRJobConfig.SHUFFLE_MERGE_EPRCENT}); new String[] {MRJobConfig.SHUFFLE_MERGE_PERCENT});
Configuration.addDeprecation("mapred.max.reduce.failures.percent", Configuration.addDeprecation("mapred.max.reduce.failures.percent",
new String[] {MRJobConfig.REDUCE_FAILURES_MAXPERCENT}); new String[] {MRJobConfig.REDUCE_FAILURES_MAXPERCENT});
Configuration.addDeprecation("mapred.reduce.child.env", Configuration.addDeprecation("mapred.reduce.child.env",

View File

@ -517,6 +517,13 @@
</description> </description>
</property> </property>
<property>
<name>mapreduce.reduce.shuffle.memory.limit.percent</name>
<value>0.25</value>
<description>Expert: Maximum percentage of the in-memory limit that a
single shuffle can consume</description>
</property>
<property> <property>
<name>mapreduce.reduce.markreset.buffer.percent</name> <name>mapreduce.reduce.markreset.buffer.percent</name>
<value>0.0</value> <value>0.0</value>

View File

@ -89,7 +89,7 @@ public class TestReduceFetchFromPartialMem extends TestCase {
job.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, "-Xmx128m"); job.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, "-Xmx128m");
job.setLong(JobContext.REDUCE_MEMORY_TOTAL_BYTES, 128 << 20); job.setLong(JobContext.REDUCE_MEMORY_TOTAL_BYTES, 128 << 20);
job.set(JobContext.SHUFFLE_INPUT_BUFFER_PERCENT, "0.14"); job.set(JobContext.SHUFFLE_INPUT_BUFFER_PERCENT, "0.14");
job.set(JobContext.SHUFFLE_MERGE_EPRCENT, "1.0"); job.set(JobContext.SHUFFLE_MERGE_PERCENT, "1.0");
Counters c = runJob(job); Counters c = runJob(job);
final long out = c.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter(); final long out = c.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter();
final long spill = c.findCounter(TaskCounter.SPILLED_RECORDS).getCounter(); final long spill = c.findCounter(TaskCounter.SPILLED_RECORDS).getCounter();