diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index e4f023e03c7..1e3ae4f5bb6 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -9,6 +9,9 @@ Release 2.0.3-alpha - Unreleased NEW FEATURES + MAPREDUCE-4520. Added support for MapReduce applications to request for + CPU cores along-with memory post YARN-2. (acmurthy) + IMPROVEMENTS MAPREDUCE-3678. The Map tasks logs should have the value of input diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index b20457f862d..7306cda792b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -1050,6 +1050,10 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, conf.getInt(MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB); + long sysCPUSizeForUberSlot = + conf.getInt(MRJobConfig.MR_AM_CPU_VCORES, + MRJobConfig.DEFAULT_MR_AM_CPU_VCORES); + boolean uberEnabled = conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); boolean smallNumMapTasks = (numMapTasks <= sysMaxMaps); @@ -1061,6 +1065,13 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, conf.getLong(MRJobConfig.REDUCE_MEMORY_MB, 0)) <= sysMemSizeForUberSlot) || (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT)); + boolean smallCpu = + ( + Math.max( + conf.getInt(MRJobConfig.MAP_CPU_VCORES, 1), + conf.getInt(MRJobConfig.REDUCE_CPU_VCORES, 1)) < + sysCPUSizeForUberSlot + ); boolean notChainJob = !isChainJob(conf); // User has overall veto power over uberization, or user can modify @@ -1071,7 +1082,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, // while "uber-AM" (MR AM + LocalContainerLauncher) loops over tasks // and thus requires sequential execution. isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks - && smallInput && smallMemory && notChainJob && isValidUberMaxReduces; + && smallInput && smallMemory && smallCpu + && notChainJob && isValidUberMaxReduces; if (isUber) { LOG.info("Uberizing job " + jobId + ": " + numMapTasks + "m+" diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index f7aedbbed9f..31ea2f2ac1a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -527,7 +527,10 @@ public abstract class TaskAttemptImpl implements //TODO:create the resource reqt for this Task attempt this.resourceCapability = recordFactory.newRecordInstance(Resource.class); - this.resourceCapability.setMemory(getMemoryRequired(conf, taskId.getTaskType())); + this.resourceCapability.setMemory( + getMemoryRequired(conf, taskId.getTaskType())); + this.resourceCapability.setVirtualCores( + getCpuRequired(conf, taskId.getTaskType())); this.dataLocalHosts = dataLocalHosts; RackResolver.init(conf); @@ -551,6 +554,21 @@ public abstract class TaskAttemptImpl implements return memory; } + private int getCpuRequired(Configuration conf, TaskType taskType) { + int vcores = 1; + if (taskType == TaskType.MAP) { + vcores = + conf.getInt(MRJobConfig.MAP_CPU_VCORES, + MRJobConfig.DEFAULT_MAP_CPU_VCORES); + } else if (taskType == TaskType.REDUCE) { + vcores = + conf.getInt(MRJobConfig.REDUCE_CPU_VCORES, + MRJobConfig.DEFAULT_REDUCE_CPU_VCORES); + } + + return vcores; + } + /** * Create a {@link LocalResource} record with all the given parameters. */ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index cfc50af6d25..cb11711356b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -181,7 +181,8 @@ public interface MRJobConfig { public static final String MAP_MEMORY_MB = "mapreduce.map.memory.mb"; public static final int DEFAULT_MAP_MEMORY_MB = 1024; - public static final String MAP_MEMORY_PHYSICAL_MB = "mapreduce.map.memory.physical.mb"; + public static final String MAP_CPU_VCORES = "mapreduce.map.cpu.vcores"; + public static final int DEFAULT_MAP_CPU_VCORES = 1; public static final String MAP_ENV = "mapreduce.map.env"; @@ -225,11 +226,12 @@ public interface MRJobConfig { public static final String REDUCE_MARKRESET_BUFFER_SIZE = "mapreduce.reduce.markreset.buffer.size"; - public static final String REDUCE_MEMORY_PHYSICAL_MB = "mapreduce.reduce.memory.physical.mb"; - public static final String REDUCE_MEMORY_MB = "mapreduce.reduce.memory.mb"; public static final int DEFAULT_REDUCE_MEMORY_MB = 1024; + public static final String REDUCE_CPU_VCORES = "mapreduce.reduce.cpu.vcores"; + public static final int DEFAULT_REDUCE_CPU_VCORES = 1; + public static final String REDUCE_MEMORY_TOTAL_BYTES = "mapreduce.reduce.memory.totalbytes"; public static final String SHUFFLE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.shuffle.input.buffer.percent"; @@ -350,6 +352,11 @@ public interface MRJobConfig { MR_AM_PREFIX+"resource.mb"; public static final int DEFAULT_MR_AM_VMEM_MB = 1536; + /** The number of virtual cores the MR app master needs.*/ + public static final String MR_AM_CPU_VCORES = + MR_AM_PREFIX+"resource.cpu-vcores"; + public static final int DEFAULT_MR_AM_CPU_VCORES = 1; + /** Command line arguments passed to the MR app master.*/ public static final String MR_AM_COMMAND_OPTS = MR_AM_PREFIX+"command-opts"; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 34e8330038c..dd52b80ad42 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -474,6 +474,22 @@ + + mapreduce.map.cpu.vcores + 1 + + The number of virtual cores required for each map task. + + + + + mapreduce.reduce.cpu.vcores + 1 + + The number of virtual cores required for each reduce task. + + + mapreduce.reduce.merge.inmem.threshold 1000 @@ -1340,6 +1356,14 @@ The amount of memory the MR AppMaster needs. + + yarn.app.mapreduce.am.resource.cpu-vcores + 1 + + The number of virtual CPU cores the MR AppMaster needs. + + + CLASSPATH for MR applications. A comma-separated list of CLASSPATH entries diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index 1b684364f51..c3f6b75b756 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -324,8 +324,16 @@ public class YARNRunner implements ClientProtocol { // Setup resource requirements Resource capability = recordFactory.newRecordInstance(Resource.class); - capability.setMemory(conf.getInt(MRJobConfig.MR_AM_VMEM_MB, - MRJobConfig.DEFAULT_MR_AM_VMEM_MB)); + capability.setMemory( + conf.getInt( + MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB + ) + ); + capability.setVirtualCores( + conf.getInt( + MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES + ) + ); LOG.debug("AppMaster capability = " + capability); // Setup LocalResources