MAPREDUCE-5130. Add missing job config options to mapred-default.xml (Ray Chiang via Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1619631 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanford Ryza 2014-08-21 23:54:38 +00:00
parent fab20109ae
commit 53da85efaf
7 changed files with 121 additions and 132 deletions

View File

@ -31,6 +31,9 @@ Release 2.6.0 - UNRELEASED
MAPREDUCE-5974. Allow specifying multiple MapOutputCollectors with
fallback. (Todd Lipcon via kasha)
MAPREDUCE-5130. Add missing job config options to mapred-default.xml
(Ray Chiang via Sandy Ryza)
OPTIMIZATIONS
BUG FIXES

View File

@ -151,7 +151,9 @@ public class JobConf extends Configuration {
/**
* A value which if set for memory related configuration options,
* indicates that the options are turned off.
* Deprecated because it makes no sense in the context of MR2.
*/
@Deprecated
public static final long DISABLED_MEMORY_LIMIT = -1L;
/**
@ -1809,27 +1811,19 @@ public class JobConf extends Configuration {
* Get memory required to run a map task of the job, in MB.
*
* If a value is specified in the configuration, it is returned.
* Else, it returns {@link #DISABLED_MEMORY_LIMIT}.
* Else, it returns {@link JobContext#DEFAULT_MAP_MEMORY_MB}.
* <p/>
* For backward compatibility, if the job configuration sets the
* key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
* from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
* after converting it from bytes to MB.
* @return memory required to run a map task of the job, in MB,
* or {@link #DISABLED_MEMORY_LIMIT} if unset.
*/
public long getMemoryForMapTask() {
long value = getDeprecatedMemoryValue();
if (value == DISABLED_MEMORY_LIMIT) {
value = normalizeMemoryConfigValue(
getLong(JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY,
DISABLED_MEMORY_LIMIT));
}
// In case that M/R 1.x applications use the old property name
if (value == DISABLED_MEMORY_LIMIT) {
value = normalizeMemoryConfigValue(
getLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY,
DISABLED_MEMORY_LIMIT));
if (value < 0) {
return getLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY,
JobContext.DEFAULT_MAP_MEMORY_MB);
}
return value;
}
@ -1844,27 +1838,19 @@ public class JobConf extends Configuration {
* Get memory required to run a reduce task of the job, in MB.
*
* If a value is specified in the configuration, it is returned.
* Else, it returns {@link #DISABLED_MEMORY_LIMIT}.
* Else, it returns {@link JobContext#DEFAULT_REDUCE_MEMORY_MB}.
* <p/>
* For backward compatibility, if the job configuration sets the
* key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
* from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
* after converting it from bytes to MB.
* @return memory required to run a reduce task of the job, in MB,
* or {@link #DISABLED_MEMORY_LIMIT} if unset.
* @return memory required to run a reduce task of the job, in MB.
*/
public long getMemoryForReduceTask() {
long value = getDeprecatedMemoryValue();
if (value == DISABLED_MEMORY_LIMIT) {
value = normalizeMemoryConfigValue(
getLong(JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY,
DISABLED_MEMORY_LIMIT));
}
// In case that M/R 1.x applications use the old property name
if (value == DISABLED_MEMORY_LIMIT) {
value = normalizeMemoryConfigValue(
getLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY,
DISABLED_MEMORY_LIMIT));
if (value < 0) {
return getLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY,
JobContext.DEFAULT_REDUCE_MEMORY_MB);
}
return value;
}
@ -1876,8 +1862,7 @@ public class JobConf extends Configuration {
private long getDeprecatedMemoryValue() {
long oldValue = getLong(MAPRED_TASK_MAXVMEM_PROPERTY,
DISABLED_MEMORY_LIMIT);
oldValue = normalizeMemoryConfigValue(oldValue);
if (oldValue != DISABLED_MEMORY_LIMIT) {
if (oldValue > 0) {
oldValue /= (1024*1024);
}
return oldValue;
@ -1921,39 +1906,6 @@ public class JobConf extends Configuration {
return val;
}
/**
* Compute the number of slots required to run a single map task-attempt
* of this job.
* @param slotSizePerMap cluster-wide value of the amount of memory required
* to run a map-task
* @return the number of slots required to run a single map task-attempt
* 1 if memory parameters are disabled.
*/
int computeNumSlotsPerMap(long slotSizePerMap) {
if ((slotSizePerMap==DISABLED_MEMORY_LIMIT) ||
(getMemoryForMapTask()==DISABLED_MEMORY_LIMIT)) {
return 1;
}
return (int)(Math.ceil((float)getMemoryForMapTask() / (float)slotSizePerMap));
}
/**
* Compute the number of slots required to run a single reduce task-attempt
* of this job.
* @param slotSizePerReduce cluster-wide value of the amount of memory
* required to run a reduce-task
* @return the number of slots required to run a single reduce task-attempt
* 1 if memory parameters are disabled
*/
int computeNumSlotsPerReduce(long slotSizePerReduce) {
if ((slotSizePerReduce==DISABLED_MEMORY_LIMIT) ||
(getMemoryForReduceTask()==DISABLED_MEMORY_LIMIT)) {
return 1;
}
return
(int)(Math.ceil((float)getMemoryForReduceTask() / (float)slotSizePerReduce));
}
/**
* Find a jar that contains a class of the same name, if any.
* It will return a jar file, even if that is not the first thing
@ -1975,14 +1927,12 @@ public class JobConf extends Configuration {
* set for map and reduce tasks of a job, in MB.
* <p/>
* For backward compatibility, if the job configuration sets the
* key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
* from {@link #DISABLED_MEMORY_LIMIT}, that value is returned.
* key {@link #MAPRED_TASK_MAXVMEM_PROPERTY}, that value is returned.
* Otherwise, this method will return the larger of the values returned by
* {@link #getMemoryForMapTask()} and {@link #getMemoryForReduceTask()}
* after converting them into bytes.
*
* @return Memory required to run a task of this job, in bytes,
* or {@link #DISABLED_MEMORY_LIMIT}, if unset.
* @return Memory required to run a task of this job, in bytes.
* @see #setMaxVirtualMemoryForTask(long)
* @deprecated Use {@link #getMemoryForMapTask()} and
* {@link #getMemoryForReduceTask()}
@ -1993,15 +1943,8 @@ public class JobConf extends Configuration {
"getMaxVirtualMemoryForTask() is deprecated. " +
"Instead use getMemoryForMapTask() and getMemoryForReduceTask()");
long value = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
value = normalizeMemoryConfigValue(value);
if (value == DISABLED_MEMORY_LIMIT) {
value = Math.max(getMemoryForMapTask(), getMemoryForReduceTask());
value = normalizeMemoryConfigValue(value);
if (value != DISABLED_MEMORY_LIMIT) {
value *= 1024*1024;
}
}
long value = getLong(MAPRED_TASK_MAXVMEM_PROPERTY,
Math.max(getMemoryForMapTask(), getMemoryForReduceTask()) * 1024 * 1024);
return value;
}
@ -2027,9 +1970,8 @@ public class JobConf extends Configuration {
public void setMaxVirtualMemoryForTask(long vmem) {
LOG.warn("setMaxVirtualMemoryForTask() is deprecated."+
"Instead use setMemoryForMapTask() and setMemoryForReduceTask()");
if(vmem != DISABLED_MEMORY_LIMIT && vmem < 0) {
setMemoryForMapTask(DISABLED_MEMORY_LIMIT);
setMemoryForReduceTask(DISABLED_MEMORY_LIMIT);
if (vmem < 0) {
throw new IllegalArgumentException("Task memory allocation may not be < 0");
}
if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) == null) {

View File

@ -278,6 +278,8 @@ public class ConfigUtil {
MRJobConfig.TASK_DEBUGOUT_LINES),
new DeprecationDelta("mapred.merge.recordsBeforeProgress",
MRJobConfig.RECORDS_BEFORE_PROGRESS),
new DeprecationDelta("mapred.merge.recordsBeforeProgress",
MRJobConfig.COMBINE_RECORDS_BEFORE_PROGRESS),
new DeprecationDelta("mapred.skip.attempts.to.start.skipping",
MRJobConfig.SKIP_START_ATTEMPTS),
new DeprecationDelta("mapred.task.id",

View File

@ -397,55 +397,41 @@
</property>
<property>
<name>mapreduce.tasktracker.map.tasks.maximum</name>
<value>2</value>
<description>The maximum number of map tasks that will be run
simultaneously by a task tracker.
<name>mapreduce.map.memory.mb</name>
<value>1024</value>
<description>The amount of memory to request from the scheduler for each
map task.
</description>
</property>
<property>
<name>mapreduce.tasktracker.reduce.tasks.maximum</name>
<value>2</value>
<description>The maximum number of reduce tasks that will be run
simultaneously by a task tracker.
<name>mapreduce.map.cpu.vcores</name>
<value>1</value>
<description>The number of virtual cores to request from the scheduler for
each map task.
</description>
</property>
<property>
<name>mapreduce.jobtracker.retiredjobs.cache.size</name>
<value>1000</value>
<description>The number of retired job status to keep in the cache.
<name>mapreduce.reduce.memory.mb</name>
<value>1024</value>
<description>The amount of memory to request from the scheduler for each
reduce task.
</description>
</property>
<property>
<name>mapreduce.tasktracker.outofband.heartbeat</name>
<value>false</value>
<description>Expert: Set this to true to let the tasktracker send an
out-of-band heartbeat on task-completion for better latency.
</description>
</property>
<property>
<name>mapreduce.jobtracker.jobhistory.lru.cache.size</name>
<value>5</value>
<description>The number of job history files loaded in memory. The jobs are
loaded when they are first accessed. The cache is cleared based on LRU.
</description>
</property>
<property>
<name>mapreduce.jobtracker.instrumentation</name>
<value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value>
<description>Expert: The instrumentation class to associate with each JobTracker.
<name>mapreduce.reduce.cpu.vcores</name>
<value>1</value>
<description>The number of virtual cores to request from the scheduler for
each reduce task.
</description>
</property>
<property>
<name>mapred.child.java.opts</name>
<value>-Xmx200m</value>
<description>Java opts for the task tracker child processes.
<description>Java opts for the task processes.
The following symbol, if present, will be interpolated: @taskid@ is replaced
by current TaskID. Any other occurrences of '@' will go unchanged.
For example, to enable verbose gc logging to a file named for the taskid in
@ -459,17 +445,55 @@
</description>
</property>
<!-- This is commented out so that it won't override mapred.child.java.opts.
<property>
<name>mapreduce.map.java.opts</name>
<value></value>
<description>Java opts only for the child processes that are maps. If set,
this will be used instead of mapred.child.java.opts.
</description>
</property>
-->
<!-- This is commented out so that it won't override mapred.child.java.opts.
<property>
<name>mapreduce.reduce.java.opts</name>
<value></value>
<description>Java opts only for the child processes that are reduces. If set,
this will be used instead of mapred.child.java.opts.
</description>
</property>
-->
<property>
<name>mapred.child.env</name>
<value></value>
<description>User added environment variables for the task tracker child
processes. Example :
<description>User added environment variables for the task processes.
Example :
1) A=foo This will set the env variable A to foo
2) B=$B:c This is inherit nodemanager's B env variable on Unix.
3) B=%B%;c This is inherit nodemanager's B env variable on Windows.
</description>
</property>
<!-- This is commented out so that it won't override mapred.child.env.
<property>
<name>mapreduce.map.env</name>
<value></value>
<description>User added environment variables for the map task processes.
</description>
</property>
-->
<!-- This is commented out so that it won't override mapred.child.env.
<property>
<name>mapreduce.reduce.env</name>
<value></value>
<description>User added environment variables for the reduce task processes.
</description>
</property>
-->
<property>
<name>mapreduce.admin.user.env</name>
<value></value>
@ -754,12 +778,21 @@
</description>
</property>
<<<<<<< .working
<property>
<name>mapreduce.jobtracker.maxtasks.perjob</name>
<value>-1</value>
<description>The maximum number of tasks for a single job.
A value of -1 indicates that there is no maximum. </description>
</property>
=======
<property>
<name>mapreduce.input.lineinputformat.linespermap</name>
<value>1</value>
<description>When using NLineInputFormat, the number of lines of input data
to include in each split.</description>
</property>
>>>>>>> .merge-right.r1619626
<property>
<name>mapreduce.client.submit.file.replication</name>
@ -1304,6 +1337,14 @@
</description>
</property>
<property>
<name>mapreduce.task.combine.progress.records</name>
<value>10000</value>
<description> The number of records to process during combine output collection
before sending a progress notification.
</description>
</property>
<property>
<name>mapreduce.job.reduce.slowstart.completedmaps</name>
<value>0.05</value>

View File

@ -140,18 +140,21 @@ public class TestJobConf {
conf.setQueueName("qname");
assertEquals("qname", conf.getQueueName());
assertEquals(1, conf.computeNumSlotsPerMap(100L));
assertEquals(1, conf.computeNumSlotsPerReduce(100L));
conf.setMemoryForMapTask(100 * 1000);
assertEquals(1000, conf.computeNumSlotsPerMap(100L));
assertEquals(100 * 1000, conf.getMemoryForMapTask());
conf.setMemoryForReduceTask(1000 * 1000);
assertEquals(1000, conf.computeNumSlotsPerReduce(1000L));
assertEquals(1000 * 1000, conf.getMemoryForReduceTask());
assertEquals(-1, conf.getMaxPhysicalMemoryForTask());
assertEquals("The variable key is no longer used.",
JobConf.deprecatedString("key"));
// make sure mapreduce.map|reduce.java.opts are not set by default
// so that they won't override mapred.child.java.opts
assertEquals("mapreduce.map.java.opts should not be set by default",
null, conf.get(JobConf.MAPRED_MAP_TASK_JAVA_OPTS));
assertEquals("mapreduce.reduce.java.opts should not be set by default",
null, conf.get(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS));
}
/**

View File

@ -108,6 +108,11 @@ public class TestJobConf {
JobConf configuration = new JobConf();
configuration.set(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY, "-3");
Assert.assertEquals(MRJobConfig.DEFAULT_MAP_MEMORY_MB,
configuration.getMemoryForMapTask());
Assert.assertEquals(MRJobConfig.DEFAULT_REDUCE_MEMORY_MB,
configuration.getMemoryForReduceTask());
configuration.set(MRJobConfig.MAP_MEMORY_MB, "4");
configuration.set(MRJobConfig.REDUCE_MEMORY_MB, "5");
Assert.assertEquals(4, configuration.getMemoryForMapTask());
@ -116,23 +121,16 @@ public class TestJobConf {
}
/**
* Test that negative values for all memory configuration properties causes
* APIs to disable memory limits
* Test that negative values for new configuration keys get passed through.
*/
@Test
public void testNegativeValuesForMemoryParams() {
JobConf configuration = new JobConf();
configuration.set(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY, "-4");
configuration.set(MRJobConfig.MAP_MEMORY_MB, "-5");
configuration.set(MRJobConfig.REDUCE_MEMORY_MB, "-6");
Assert.assertEquals(JobConf.DISABLED_MEMORY_LIMIT,
configuration.getMemoryForMapTask());
Assert.assertEquals(JobConf.DISABLED_MEMORY_LIMIT,
configuration.getMemoryForReduceTask());
Assert.assertEquals(JobConf.DISABLED_MEMORY_LIMIT,
configuration.getMaxVirtualMemoryForTask());
Assert.assertEquals(-5, configuration.getMemoryForMapTask());
Assert.assertEquals(-6, configuration.getMemoryForReduceTask());
}
/**

View File

@ -97,10 +97,10 @@ public class TestHighRamJob {
// check if the high ram properties are not set
assertEquals(expectedMapMB,
simulatedConf.getLong(MRJobConfig.MAP_MEMORY_MB,
JobConf.DISABLED_MEMORY_LIMIT));
MRJobConfig.DEFAULT_MAP_MEMORY_MB));
assertEquals(expectedReduceMB,
simulatedConf.getLong(MRJobConfig.REDUCE_MEMORY_MB,
JobConf.DISABLED_MEMORY_LIMIT));
MRJobConfig.DEFAULT_MAP_MEMORY_MB));
}
/**
@ -114,10 +114,10 @@ public class TestHighRamJob {
// test : check high ram emulation disabled
gridmixConf.setBoolean(GridmixJob.GRIDMIX_HIGHRAM_EMULATION_ENABLE, false);
testHighRamConfig(10, 20, 5, 10, JobConf.DISABLED_MEMORY_LIMIT,
JobConf.DISABLED_MEMORY_LIMIT,
JobConf.DISABLED_MEMORY_LIMIT,
JobConf.DISABLED_MEMORY_LIMIT, gridmixConf);
testHighRamConfig(10, 20, 5, 10, MRJobConfig.DEFAULT_MAP_MEMORY_MB,
MRJobConfig.DEFAULT_REDUCE_MEMORY_MB,
MRJobConfig.DEFAULT_MAP_MEMORY_MB,
MRJobConfig.DEFAULT_REDUCE_MEMORY_MB, gridmixConf);
// test : check with high ram enabled (default) and no scaling
gridmixConf = new Configuration();