From 8e5ce8055e77e752edded969336c3e7591f291e9 Mon Sep 17 00:00:00 2001 From: Jonathan Hung Date: Wed, 20 Mar 2019 17:46:35 -0700 Subject: [PATCH] YARN-9271. Backport YARN-6927 for resource type support in MapReduce --- .../v2/app/job/impl/TaskAttemptImpl.java | 141 ++++++- .../mapreduce/TestMapreduceConfigFields.java | 11 + .../v2/app/job/impl/TestTaskAttempt.java | 365 +++++++++++++++++- .../apache/hadoop/mapreduce/MRJobConfig.java | 68 +++- .../org/apache/hadoop/mapred/YARNRunner.java | 86 ++++- .../apache/hadoop/mapred/TestYARNRunner.java | 167 ++++++++ .../yarn/util/resource/ResourceUtils.java | 44 +++ 7 files changed, 853 insertions(+), 29 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index dfc3adb29cc..3f37d4ddf27 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -18,6 +18,8 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl; +import static org.apache.commons.lang.StringUtils.isEmpty; + import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -126,6 +128,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; @@ -139,6 +142,8 @@ import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.RackResolver; +import org.apache.hadoop.yarn.util.UnitsConversionUtil; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -664,12 +669,8 @@ public TaskAttemptImpl(TaskId taskId, int i, this.jobFile = jobFile; this.partition = partition; - //TODO:create the resource reqt for this Task attempt this.resourceCapability = recordFactory.newRecordInstance(Resource.class); - this.resourceCapability.setMemorySize( - getMemoryRequired(conf, taskId.getTaskType())); - this.resourceCapability.setVirtualCores( - getCpuRequired(conf, taskId.getTaskType())); + populateResourceCapability(taskId.getTaskType()); this.dataLocalHosts = resolveHosts(dataLocalHosts); RackResolver.init(conf); @@ -701,21 +702,133 @@ private int getMemoryRequired(Configuration conf, TaskType taskType) { return memory; } + private void populateResourceCapability(TaskType taskType) { + String resourceTypePrefix = + getResourceTypePrefix(taskType); + boolean memorySet = false; + boolean cpuVcoresSet = false; + if (resourceTypePrefix != null) { + List resourceRequests = + ResourceUtils.getRequestedResourcesFromConfig(conf, + resourceTypePrefix); + for (ResourceInformation resourceRequest : resourceRequests) { + String resourceName = resourceRequest.getName(); + if (MRJobConfig.RESOURCE_TYPE_NAME_MEMORY.equals(resourceName) || + MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY.equals( + resourceName)) { + if (memorySet) { + throw new IllegalArgumentException( + "Only one of the following keys " + + "can be specified for a single job: " + + MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY + ", " + + MRJobConfig.RESOURCE_TYPE_NAME_MEMORY); + } + String units = isEmpty(resourceRequest.getUnits()) ? + ResourceUtils.getDefaultUnit(ResourceInformation.MEMORY_URI) : + resourceRequest.getUnits(); + this.resourceCapability.setMemorySize( + UnitsConversionUtil.convert(units, "Mi", + resourceRequest.getValue())); + memorySet = true; + String memoryKey = getMemoryKey(taskType); + if (memoryKey != null && conf.get(memoryKey) != null) { + LOG.warn("Configuration " + resourceTypePrefix + resourceName + + "=" + resourceRequest.getValue() + resourceRequest.getUnits() + + " is overriding the " + memoryKey + "=" + conf.get(memoryKey) + + " configuration"); + } + } else if (MRJobConfig.RESOURCE_TYPE_NAME_VCORE.equals( + resourceName)) { + this.resourceCapability.setVirtualCores( + (int) UnitsConversionUtil.convert(resourceRequest.getUnits(), "", + resourceRequest.getValue())); + cpuVcoresSet = true; + String cpuKey = getCpuVcoresKey(taskType); + if (cpuKey != null && conf.get(cpuKey) != null) { + LOG.warn("Configuration " + resourceTypePrefix + + MRJobConfig.RESOURCE_TYPE_NAME_VCORE + "=" + + resourceRequest.getValue() + resourceRequest.getUnits() + + " is overriding the " + cpuKey + "=" + + conf.get(cpuKey) + " configuration"); + } + } else { + ResourceInformation resourceInformation = + this.resourceCapability.getResourceInformation(resourceName); + resourceInformation.setUnits(resourceRequest.getUnits()); + resourceInformation.setValue(resourceRequest.getValue()); + this.resourceCapability.setResourceInformation(resourceName, + resourceInformation); + } + } + } + if (!memorySet) { + this.resourceCapability.setMemorySize(getMemoryRequired(conf, taskType)); + } + if (!cpuVcoresSet) { + this.resourceCapability.setVirtualCores(getCpuRequired(conf, taskType)); + } + } + + private String getCpuVcoresKey(TaskType taskType) { + switch (taskType) { + case MAP: + return MRJobConfig.MAP_CPU_VCORES; + case REDUCE: + return MRJobConfig.REDUCE_CPU_VCORES; + default: + return null; + } + } + + private String getMemoryKey(TaskType taskType) { + switch (taskType) { + case MAP: + return MRJobConfig.MAP_MEMORY_MB; + case REDUCE: + return MRJobConfig.REDUCE_MEMORY_MB; + default: + return null; + } + } + + private Integer getCpuVcoreDefault(TaskType taskType) { + switch (taskType) { + case MAP: + return MRJobConfig.DEFAULT_MAP_CPU_VCORES; + case REDUCE: + return MRJobConfig.DEFAULT_REDUCE_CPU_VCORES; + default: + return null; + } + } + private int getCpuRequired(Configuration conf, TaskType taskType) { int vcores = 1; - if (taskType == TaskType.MAP) { - vcores = - conf.getInt(MRJobConfig.MAP_CPU_VCORES, - MRJobConfig.DEFAULT_MAP_CPU_VCORES); - } else if (taskType == TaskType.REDUCE) { - vcores = - conf.getInt(MRJobConfig.REDUCE_CPU_VCORES, - MRJobConfig.DEFAULT_REDUCE_CPU_VCORES); + String cpuVcoreKey = getCpuVcoresKey(taskType); + if (cpuVcoreKey != null) { + Integer defaultCpuVcores = getCpuVcoreDefault(taskType); + if (null == defaultCpuVcores) { + defaultCpuVcores = vcores; + } + vcores = conf.getInt(cpuVcoreKey, defaultCpuVcores); } - return vcores; } + private String getResourceTypePrefix(TaskType taskType) { + switch (taskType) { + case MAP: + return MRJobConfig.MAP_RESOURCE_TYPE_PREFIX; + case REDUCE: + return MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX; + default: + LOG.info("TaskType " + taskType + + " does not support custom resource types - this support can be " + + "added in " + getClass().getSimpleName()); + return null; + } + } + /** * Create a {@link LocalResource} record with all the given parameters. * The NM that hosts AM container will upload resources to shared cache. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java index 096cec937d4..f469aad1e6b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java @@ -78,6 +78,17 @@ public void initializeMemberVariables() { xmlPropsToSkipCompare.add("mapreduce.local.clientfactory.class.name"); xmlPropsToSkipCompare.add("mapreduce.jobtracker.system.dir"); xmlPropsToSkipCompare.add("mapreduce.jobtracker.staging.root.dir"); + + // Resource type related properties are only prefixes, + // they need to be postfixed with the resource name + // in order to take effect. + // There is nothing to be added to mapred-default.xml + configurationPropsToSkipCompare.add( + MRJobConfig.MR_AM_RESOURCE_PREFIX); + configurationPropsToSkipCompare.add( + MRJobConfig.MAP_RESOURCE_TYPE_PREFIX); + configurationPropsToSkipCompare.add( + MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index 60a2177a88c..e055798640d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -28,14 +28,21 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; import com.google.common.base.Supplier; +import org.junit.After; import org.junit.Assert; +import org.junit.BeforeClass; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -43,6 +50,7 @@ import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapTaskAttemptImpl; +import org.apache.hadoop.mapred.ReduceTaskAttemptImpl; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -83,24 +91,36 @@ import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.LocalConfigurationProvider; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; import org.junit.Test; import org.mockito.ArgumentCaptor; +import com.google.common.collect.ImmutableList; + @SuppressWarnings({"unchecked", "rawtypes"}) public class TestTaskAttempt{ - + + private static final String CUSTOM_RESOURCE_NAME = "a-custom-resource"; + static public class StubbedFS extends RawLocalFileSystem { @Override public FileStatus getFileStatus(Path f) throws IOException { @@ -108,6 +128,63 @@ public FileStatus getFileStatus(Path f) throws IOException { } } + private static class CustomResourceTypesConfigurationProvider + extends LocalConfigurationProvider { + + @Override + public InputStream getConfigurationInputStream(Configuration bootstrapConf, + String name) throws YarnException, IOException { + if (YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE.equals(name)) { + return new ByteArrayInputStream( + ("\n" + + " \n" + + " yarn.resource-types\n" + + " a-custom-resource\n" + + " \n" + + " \n" + + " yarn.resource-types.a-custom-resource.units\n" + + " G\n" + + " \n" + + "\n").getBytes()); + } else { + return super.getConfigurationInputStream(bootstrapConf, name); + } + } + } + + private static class TestAppender extends AppenderSkeleton { + + private final List logEvents = new CopyOnWriteArrayList<>(); + + @Override + public boolean requiresLayout() { + return false; + } + + @Override + public void close() { + } + + @Override + protected void append(LoggingEvent arg0) { + logEvents.add(arg0); + } + + private List getLogEvents() { + return logEvents; + } + } + + @BeforeClass + public static void setupBeforeClass() { + ResourceUtils.resetResourceTypes(new Configuration()); + } + + @After + public void tearDown() { + ResourceUtils.resetResourceTypes(new Configuration()); + } + @Test public void testMRAppHistoryForMap() throws Exception { MRApp app = new FailingAttemptsMRApp(1, 0); @@ -329,17 +406,18 @@ public void verifyMillisCounters(Resource containerResource, private TaskAttemptImpl createMapTaskAttemptImplForTest( EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo) { Clock clock = SystemClock.getInstance(); - return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo, clock); + return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo, + clock, new JobConf()); } private TaskAttemptImpl createMapTaskAttemptImplForTest( - EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) { + EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, + Clock clock, JobConf jobConf) { ApplicationId appId = ApplicationId.newInstance(1, 1); JobId jobId = MRBuilderUtils.newJobId(appId, 1); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); TaskAttemptListener taListener = mock(TaskAttemptListener.class); Path jobFile = mock(Path.class); - JobConf jobConf = new JobConf(); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, taskSplitMetaInfo, jobConf, taListener, null, @@ -347,6 +425,20 @@ private TaskAttemptImpl createMapTaskAttemptImplForTest( return taImpl; } + private TaskAttemptImpl createReduceTaskAttemptImplForTest( + EventHandler eventHandler, Clock clock, JobConf jobConf) { + ApplicationId appId = ApplicationId.newInstance(1, 1); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.REDUCE); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + Path jobFile = mock(Path.class); + TaskAttemptImpl taImpl = + new ReduceTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, + 1, jobConf, taListener, null, + null, clock, null); + return taImpl; + } + private void testMRAppHistory(MRApp app) throws Exception { Configuration conf = new Configuration(); Job job = app.submit(conf); @@ -1423,6 +1515,271 @@ public void testTimeoutWhileFailFinishing() throws Exception { assertFalse("InternalError occurred", eventHandler.internalError); } + @Test + public void testMapperCustomResourceTypes() { + initResourceTypes(); + EventHandler eventHandler = mock(EventHandler.class); + TaskSplitMetaInfo taskSplitMetaInfo = new TaskSplitMetaInfo(); + Clock clock = SystemClock.getInstance(); + JobConf jobConf = new JobConf(); + jobConf.setLong(MRJobConfig.MAP_RESOURCE_TYPE_PREFIX + + CUSTOM_RESOURCE_NAME, 7L); + TaskAttemptImpl taImpl = createMapTaskAttemptImplForTest(eventHandler, + taskSplitMetaInfo, clock, jobConf); + ResourceInformation resourceInfo = + getResourceInfoFromContainerRequest(taImpl, eventHandler). + getResourceInformation(CUSTOM_RESOURCE_NAME); + assertEquals("Expecting the default unit (G)", + "G", resourceInfo.getUnits()); + assertEquals(7L, resourceInfo.getValue()); + } + + @Test + public void testReducerCustomResourceTypes() { + initResourceTypes(); + EventHandler eventHandler = mock(EventHandler.class); + Clock clock = SystemClock.getInstance(); + JobConf jobConf = new JobConf(); + jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX + + CUSTOM_RESOURCE_NAME, "3m"); + TaskAttemptImpl taImpl = + createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf); + ResourceInformation resourceInfo = + getResourceInfoFromContainerRequest(taImpl, eventHandler). + getResourceInformation(CUSTOM_RESOURCE_NAME); + assertEquals("Expecting the specified unit (m)", + "m", resourceInfo.getUnits()); + assertEquals(3L, resourceInfo.getValue()); + } + + @Test + public void testReducerMemoryRequestViaMapreduceReduceMemoryMb() { + EventHandler eventHandler = mock(EventHandler.class); + Clock clock = SystemClock.getInstance(); + JobConf jobConf = new JobConf(); + jobConf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 2048); + TaskAttemptImpl taImpl = + createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf); + long memorySize = + getResourceInfoFromContainerRequest(taImpl, eventHandler). + getMemorySize(); + assertEquals(2048, memorySize); + } + + @Test + public void testReducerMemoryRequestViaMapreduceReduceResourceMemory() { + EventHandler eventHandler = mock(EventHandler.class); + Clock clock = SystemClock.getInstance(); + JobConf jobConf = new JobConf(); + jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX + + MRJobConfig.RESOURCE_TYPE_NAME_MEMORY, "2 Gi"); + TaskAttemptImpl taImpl = + createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf); + long memorySize = + getResourceInfoFromContainerRequest(taImpl, eventHandler). + getMemorySize(); + assertEquals(2048, memorySize); + } + + @Test + public void testReducerMemoryRequestDefaultMemory() { + EventHandler eventHandler = mock(EventHandler.class); + Clock clock = SystemClock.getInstance(); + TaskAttemptImpl taImpl = + createReduceTaskAttemptImplForTest(eventHandler, clock, new JobConf()); + long memorySize = + getResourceInfoFromContainerRequest(taImpl, eventHandler). + getMemorySize(); + assertEquals(MRJobConfig.DEFAULT_REDUCE_MEMORY_MB, memorySize); + } + + @Test + public void testReducerMemoryRequestWithoutUnits() { + Clock clock = SystemClock.getInstance(); + for (String memoryResourceName : ImmutableList.of( + MRJobConfig.RESOURCE_TYPE_NAME_MEMORY, + MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) { + EventHandler eventHandler = mock(EventHandler.class); + JobConf jobConf = new JobConf(); + jobConf.setInt(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX + + memoryResourceName, 2048); + TaskAttemptImpl taImpl = + createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf); + long memorySize = + getResourceInfoFromContainerRequest(taImpl, eventHandler). + getMemorySize(); + assertEquals(2048, memorySize); + } + } + + @Test + public void testReducerMemoryRequestOverriding() { + for (String memoryName : ImmutableList.of( + MRJobConfig.RESOURCE_TYPE_NAME_MEMORY, + MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) { + TestAppender testAppender = new TestAppender(); + final Logger logger = Logger.getLogger(TaskAttemptImpl.class); + try { + logger.addAppender(testAppender); + EventHandler eventHandler = mock(EventHandler.class); + Clock clock = SystemClock.getInstance(); + JobConf jobConf = new JobConf(); + jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX + memoryName, + "3Gi"); + jobConf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 2048); + TaskAttemptImpl taImpl = + createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf); + long memorySize = + getResourceInfoFromContainerRequest(taImpl, eventHandler). + getMemorySize(); + assertEquals(3072, memorySize); + boolean foundLogWarning = false; + for (LoggingEvent e : testAppender.getLogEvents()) { + if (e.getLevel() == Level.WARN && ("Configuration " + + "mapreduce.reduce.resource." + memoryName + "=3Gi is " + + "overriding the mapreduce.reduce.memory.mb=2048 configuration") + .equals(e.getMessage())) { + foundLogWarning = true; + break; + } + } + assertTrue(foundLogWarning); + } finally { + logger.removeAppender(testAppender); + } + } + } + + @Test(expected=IllegalArgumentException.class) + public void testReducerMemoryRequestMultipleName() { + EventHandler eventHandler = mock(EventHandler.class); + Clock clock = SystemClock.getInstance(); + JobConf jobConf = new JobConf(); + for (String memoryName : ImmutableList.of( + MRJobConfig.RESOURCE_TYPE_NAME_MEMORY, + MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) { + jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX + memoryName, + "3Gi"); + } + createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf); + } + + @Test + public void testReducerCpuRequestViaMapreduceReduceCpuVcores() { + EventHandler eventHandler = mock(EventHandler.class); + Clock clock = SystemClock.getInstance(); + JobConf jobConf = new JobConf(); + jobConf.setInt(MRJobConfig.REDUCE_CPU_VCORES, 3); + TaskAttemptImpl taImpl = + createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf); + int vCores = + getResourceInfoFromContainerRequest(taImpl, eventHandler). + getVirtualCores(); + assertEquals(3, vCores); + } + + @Test + public void testReducerCpuRequestViaMapreduceReduceResourceVcores() { + EventHandler eventHandler = mock(EventHandler.class); + Clock clock = SystemClock.getInstance(); + JobConf jobConf = new JobConf(); + jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX + + MRJobConfig.RESOURCE_TYPE_NAME_VCORE, "5"); + TaskAttemptImpl taImpl = + createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf); + int vCores = + getResourceInfoFromContainerRequest(taImpl, eventHandler). + getVirtualCores(); + assertEquals(5, vCores); + } + + @Test + public void testReducerCpuRequestDefaultMemory() { + EventHandler eventHandler = mock(EventHandler.class); + Clock clock = SystemClock.getInstance(); + TaskAttemptImpl taImpl = + createReduceTaskAttemptImplForTest(eventHandler, clock, new JobConf()); + int vCores = + getResourceInfoFromContainerRequest(taImpl, eventHandler). + getVirtualCores(); + assertEquals(MRJobConfig.DEFAULT_REDUCE_CPU_VCORES, vCores); + } + + @Test + public void testReducerCpuRequestOverriding() { + TestAppender testAppender = new TestAppender(); + final Logger logger = Logger.getLogger(TaskAttemptImpl.class); + try { + logger.addAppender(testAppender); + EventHandler eventHandler = mock(EventHandler.class); + Clock clock = SystemClock.getInstance(); + JobConf jobConf = new JobConf(); + jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX + + MRJobConfig.RESOURCE_TYPE_NAME_VCORE, "7"); + jobConf.setInt(MRJobConfig.REDUCE_CPU_VCORES, 9); + TaskAttemptImpl taImpl = + createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf); + long vCores = + getResourceInfoFromContainerRequest(taImpl, eventHandler). + getVirtualCores(); + assertEquals(7, vCores); + boolean foundLogWarning = false; + for (LoggingEvent e : testAppender.getLogEvents()) { + if (e.getLevel() == Level.WARN && ("Configuration " + + "mapreduce.reduce.resource.vcores=7 is overriding the " + + "mapreduce.reduce.cpu.vcores=9 configuration" + ).equals(e.getMessage())) { + foundLogWarning = true; + break; + } + } + assertTrue(foundLogWarning); + } finally { + logger.removeAppender(testAppender); + } + } + + private Resource getResourceInfoFromContainerRequest( + TaskAttemptImpl taImpl, EventHandler eventHandler) { + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_SCHEDULE)); + + assertEquals("Task attempt is not in STARTING state", taImpl.getState(), + TaskAttemptState.STARTING); + + ArgumentCaptor captor = ArgumentCaptor.forClass(Event.class); + verify(eventHandler, times(2)).handle(captor.capture()); + + List containerRequestEvents = new ArrayList<>(); + for (Event e : captor.getAllValues()) { + if (e instanceof ContainerRequestEvent) { + containerRequestEvents.add((ContainerRequestEvent) e); + } + } + assertEquals("Expected one ContainerRequestEvent after scheduling " + + "task attempt", 1, containerRequestEvents.size()); + + return containerRequestEvents.get(0).getCapability(); + } + + @Test(expected=IllegalArgumentException.class) + public void testReducerCustomResourceTypeWithInvalidUnit() { + initResourceTypes(); + EventHandler eventHandler = mock(EventHandler.class); + Clock clock = SystemClock.getInstance(); + JobConf jobConf = new JobConf(); + jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX + + CUSTOM_RESOURCE_NAME, "3z"); + createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf); + } + + private void initResourceTypes() { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, + CustomResourceTypesConfigurationProvider.class.getName()); + ResourceUtils.resetResourceTypes(conf); + } + private void setupTaskAttemptFinishingMonitor( EventHandler eventHandler, JobConf jobConf, AppContext appCtx) { TaskAttemptFinishingMonitor taskAttemptFinishingMonitor = diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index d66612366a7..5a72def63c0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -360,12 +360,47 @@ public interface MRJobConfig { public static final String MAP_INPUT_START = "mapreduce.map.input.start"; + /** + * Configuration key for specifying memory requirement for the mapper. + * Kept for backward-compatibility, mapreduce.map.resource.memory + * is the new preferred way to specify this. + */ public static final String MAP_MEMORY_MB = "mapreduce.map.memory.mb"; public static final int DEFAULT_MAP_MEMORY_MB = 1024; + /** + * Configuration key for specifying CPU requirement for the mapper. + * Kept for backward-compatibility, mapreduce.map.resource.vcores + * is the new preferred way to specify this. + */ public static final String MAP_CPU_VCORES = "mapreduce.map.cpu.vcores"; public static final int DEFAULT_MAP_CPU_VCORES = 1; + /** + * Custom resource names required by the mapper should be + * appended to this prefix, the value's format is {amount}[ ][{unit}]. + * If no unit is defined, the default unit will be used. + * Standard resource names: memory (default unit: Mi), vcores + */ + public static final String MAP_RESOURCE_TYPE_PREFIX = + "mapreduce.map.resource."; + + /** + * Resource type name for CPU vcores. + */ + public static final String RESOURCE_TYPE_NAME_VCORE = "vcores"; + + /** + * Resource type name for memory. + */ + public static final String RESOURCE_TYPE_NAME_MEMORY = "memory"; + + /** + * Alternative resource type name for memory. + */ + public static final String RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY = + "memory-mb"; + public static final String MAP_ENV = "mapreduce.map.env"; public static final String MAP_JAVA_OPTS = "mapreduce.map.java.opts"; @@ -408,12 +443,31 @@ public interface MRJobConfig { public static final String REDUCE_MARKRESET_BUFFER_SIZE = "mapreduce.reduce.markreset.buffer.size"; + /** + * Configuration key for specifying memory requirement for the reducer. + * Kept for backward-compatibility, mapreduce.reduce.resource.memory + * is the new preferred way to specify this. + */ public static final String REDUCE_MEMORY_MB = "mapreduce.reduce.memory.mb"; public static final int DEFAULT_REDUCE_MEMORY_MB = 1024; + /** + * Configuration key for specifying CPU requirement for the reducer. + * Kept for backward-compatibility, mapreduce.reduce.resource.vcores + * is the new preferred way to specify this. + */ public static final String REDUCE_CPU_VCORES = "mapreduce.reduce.cpu.vcores"; public static final int DEFAULT_REDUCE_CPU_VCORES = 1; + /** + * Resource names required by the reducer should be + * appended to this prefix, the value's format is {amount}[ ][{unit}]. + * If no unit is defined, the default unit will be used. + * Standard resource names: memory (default unit: Mi), vcores + */ + public static final String REDUCE_RESOURCE_TYPE_PREFIX = + "mapreduce.reduce.resource."; + public static final String REDUCE_MEMORY_TOTAL_BYTES = "mapreduce.reduce.memory.totalbytes"; public static final String SHUFFLE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.shuffle.input.buffer.percent"; @@ -599,7 +653,10 @@ public interface MRJobConfig { public static final String DEFAULT_MR_AM_STAGING_DIR = "/tmp/hadoop-yarn/staging"; - /** The amount of memory the MR app master needs.*/ + /** The amount of memory the MR app master needs. + * Kept for backward-compatibility, yarn.app.mapreduce.am.resource.memory is + * the new preferred way to specify this + */ public static final String MR_AM_VMEM_MB = MR_AM_PREFIX+"resource.mb"; public static final int DEFAULT_MR_AM_VMEM_MB = 1536; @@ -609,6 +666,15 @@ public interface MRJobConfig { MR_AM_PREFIX+"resource.cpu-vcores"; public static final int DEFAULT_MR_AM_CPU_VCORES = 1; + /** + * Resource names required by the MR AM should be + * appended to this prefix, the value's format is {amount}[ ][{unit}]. + * If no unit is defined, the default unit will be used + * Standard resource names: memory (default unit: Mi), vcores + */ + public static final String MR_AM_RESOURCE_PREFIX = + MR_AM_PREFIX + "resource."; + /** Command line arguments passed to the MR app master.*/ public static final String MR_AM_COMMAND_OPTS = MR_AM_PREFIX+"command-opts"; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index a23ff34b574..12a307930fc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -18,6 +18,9 @@ package org.apache.hadoop.mapred; +import static org.apache.commons.lang.StringUtils.isEmpty; +import static org.apache.hadoop.mapreduce.MRJobConfig.MR_AM_RESOURCE_PREFIX; + import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; @@ -84,6 +87,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -93,6 +97,8 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.client.RMDelegationTokenSelector; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.UnitsConversionUtil; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import com.google.common.annotations.VisibleForTesting; @@ -659,16 +665,76 @@ public ApplicationSubmissionContext createApplicationSubmissionContext( private List generateResourceRequests() throws IOException { Resource capability = recordFactory.newRecordInstance(Resource.class); - capability.setMemorySize( - conf.getInt( - MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB - ) - ); - capability.setVirtualCores( - conf.getInt( - MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES - ) - ); + boolean memorySet = false; + boolean cpuVcoresSet = false; + List resourceRequests = ResourceUtils + .getRequestedResourcesFromConfig(conf, MR_AM_RESOURCE_PREFIX); + for (ResourceInformation resourceReq : resourceRequests) { + String resourceName = resourceReq.getName(); + if (MRJobConfig.RESOURCE_TYPE_NAME_MEMORY.equals(resourceName) || + MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY.equals( + resourceName)) { + if (memorySet) { + throw new IllegalArgumentException( + "Only one of the following keys " + + "can be specified for a single job: " + + MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY + ", " + + MRJobConfig.RESOURCE_TYPE_NAME_MEMORY); + } + String units = isEmpty(resourceReq.getUnits()) ? + ResourceUtils.getDefaultUnit(ResourceInformation.MEMORY_URI) : + resourceReq.getUnits(); + capability.setMemorySize( + UnitsConversionUtil.convert(units, "Mi", resourceReq.getValue())); + memorySet = true; + if (conf.get(MRJobConfig.MR_AM_VMEM_MB) != null) { + LOG.warn("Configuration " + MR_AM_RESOURCE_PREFIX + + resourceName + "=" + resourceReq.getValue() + + resourceReq.getUnits() + " is overriding the " + + MRJobConfig.MR_AM_VMEM_MB + "=" + + conf.get(MRJobConfig.MR_AM_VMEM_MB) + " configuration"); + } + } else if (MRJobConfig.RESOURCE_TYPE_NAME_VCORE.equals(resourceName)) { + capability.setVirtualCores( + (int) UnitsConversionUtil.convert(resourceReq.getUnits(), "", + resourceReq.getValue())); + cpuVcoresSet = true; + if (conf.get(MRJobConfig.MR_AM_CPU_VCORES) != null) { + LOG.warn("Configuration " + MR_AM_RESOURCE_PREFIX + + resourceName + "=" + resourceReq.getValue() + + resourceReq.getUnits() + " is overriding the " + + MRJobConfig.MR_AM_CPU_VCORES + "=" + + conf.get(MRJobConfig.MR_AM_CPU_VCORES) + " configuration"); + } + } else if (!MRJobConfig.MR_AM_VMEM_MB.equals( + MR_AM_RESOURCE_PREFIX + resourceName) && + !MRJobConfig.MR_AM_CPU_VCORES.equals( + MR_AM_RESOURCE_PREFIX + resourceName)) { + // the "mb", "cpu-vcores" resource types are not processed here + // since the yarn.app.mapreduce.am.resource.mb, + // yarn.app.mapreduce.am.resource.cpu-vcores keys are used for + // backward-compatibility - which is handled after this loop + ResourceInformation resourceInformation = capability + .getResourceInformation(resourceName); + resourceInformation.setUnits(resourceReq.getUnits()); + resourceInformation.setValue(resourceReq.getValue()); + capability.setResourceInformation(resourceName, resourceInformation); + } + } + if (!memorySet) { + capability.setMemorySize( + conf.getInt( + MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB + ) + ); + } + if (!cpuVcoresSet) { + capability.setVirtualCores( + conf.getInt( + MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES + ) + ); + } if (LOG.isDebugEnabled()) { LOG.debug("AppMaster capability = " + capability); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java index c79b08e52e4..ecb396e91fd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java @@ -32,10 +32,12 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -43,6 +45,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -69,6 +72,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Shell; +import org.apache.hadoop.yarn.LocalConfigurationProvider; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; @@ -96,28 +100,37 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.log4j.Appender; +import org.apache.log4j.AppenderSkeleton; import org.apache.log4j.Layout; +import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.log4j.SimpleLayout; import org.apache.log4j.WriterAppender; +import org.apache.log4j.spi.LoggingEvent; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import com.google.common.collect.ImmutableList; + /** * Test YarnRunner and make sure the client side plugin works * fine @@ -131,6 +144,53 @@ public class TestYARNRunner { MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS.substring(0, MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS.lastIndexOf("%")); + private static class CustomResourceTypesConfigurationProvider + extends LocalConfigurationProvider { + + @Override + public InputStream getConfigurationInputStream(Configuration bootstrapConf, + String name) throws YarnException, IOException { + if (YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE.equals(name)) { + return new ByteArrayInputStream( + ("\n" + + " \n" + + " yarn.resource-types\n" + + " a-custom-resource\n" + + " \n" + + " \n" + + " yarn.resource-types.a-custom-resource.units\n" + + " G\n" + + " \n" + + "\n").getBytes()); + } else { + return super.getConfigurationInputStream(bootstrapConf, name); + } + } + } + + private static class TestAppender extends AppenderSkeleton { + + private final List logEvents = new CopyOnWriteArrayList<>(); + + @Override + public boolean requiresLayout() { + return false; + } + + @Override + public void close() { + } + + @Override + protected void append(LoggingEvent arg0) { + logEvents.add(arg0); + } + + private List getLogEvents() { + return logEvents; + } + } + private YARNRunner yarnRunner; private ResourceMgrDelegate resourceMgrDelegate; private YarnConfiguration conf; @@ -143,6 +203,11 @@ public class TestYARNRunner { private ClientServiceDelegate clientDelegate; private static final String failString = "Rejected job"; + @BeforeClass + public static void setupBeforeClass() { + ResourceUtils.resetResourceTypes(new Configuration()); + } + @Before public void setUp() throws Exception { resourceMgrDelegate = mock(ResourceMgrDelegate.class); @@ -175,6 +240,7 @@ public ApplicationSubmissionContext answer(InvocationOnMock invocation) @After public void cleanup() { FileUtil.fullyDelete(testWorkDir); + ResourceUtils.resetResourceTypes(new Configuration()); } @Test(timeout=20000) @@ -884,4 +950,105 @@ public void testSendJobConf() throws IOException { .get("hadoop.tmp.dir").equals("testconfdir")); UserGroupInformation.reset(); } + + @Test + public void testCustomAMRMResourceType() throws Exception { + initResourceTypes(); + String customResourceName = "a-custom-resource"; + + JobConf jobConf = new JobConf(); + + jobConf.setInt(MRJobConfig.MR_AM_RESOURCE_PREFIX + + customResourceName, 5); + jobConf.setInt(MRJobConfig.MR_AM_CPU_VCORES, 3); + + yarnRunner = new YARNRunner(jobConf); + + submissionContext = buildSubmitContext(yarnRunner, jobConf); + + List resourceRequests = + submissionContext.getAMContainerResourceRequests(); + + Assert.assertEquals(1, resourceRequests.size()); + ResourceRequest resourceRequest = resourceRequests.get(0); + + ResourceInformation resourceInformation = resourceRequest.getCapability() + .getResourceInformation(customResourceName); + Assert.assertEquals("Expecting the default unit (G)", + "G", resourceInformation.getUnits()); + Assert.assertEquals(5L, resourceInformation.getValue()); + Assert.assertEquals(3, resourceRequest.getCapability().getVirtualCores()); + } + + @Test + public void testAMRMemoryRequest() throws Exception { + for (String memoryName : ImmutableList.of( + MRJobConfig.RESOURCE_TYPE_NAME_MEMORY, + MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) { + JobConf jobConf = new JobConf(); + jobConf.set(MRJobConfig.MR_AM_RESOURCE_PREFIX + memoryName, "3 Gi"); + + yarnRunner = new YARNRunner(jobConf); + + submissionContext = buildSubmitContext(yarnRunner, jobConf); + + List resourceRequests = + submissionContext.getAMContainerResourceRequests(); + + Assert.assertEquals(1, resourceRequests.size()); + ResourceRequest resourceRequest = resourceRequests.get(0); + + long memorySize = resourceRequest.getCapability().getMemorySize(); + Assert.assertEquals(3072, memorySize); + } + } + + @Test + public void testAMRMemoryRequestOverriding() throws Exception { + for (String memoryName : ImmutableList.of( + MRJobConfig.RESOURCE_TYPE_NAME_MEMORY, + MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) { + TestAppender testAppender = new TestAppender(); + Logger logger = Logger.getLogger(YARNRunner.class); + logger.addAppender(testAppender); + try { + JobConf jobConf = new JobConf(); + jobConf.set(MRJobConfig.MR_AM_RESOURCE_PREFIX + memoryName, "3 Gi"); + jobConf.setInt(MRJobConfig.MR_AM_VMEM_MB, 2048); + + yarnRunner = new YARNRunner(jobConf); + + submissionContext = buildSubmitContext(yarnRunner, jobConf); + + List resourceRequests = + submissionContext.getAMContainerResourceRequests(); + + Assert.assertEquals(1, resourceRequests.size()); + ResourceRequest resourceRequest = resourceRequests.get(0); + + long memorySize = resourceRequest.getCapability().getMemorySize(); + Assert.assertEquals(3072, memorySize); + boolean foundLogWarning = false; + for (LoggingEvent e : testAppender.getLogEvents()) { + if (e.getLevel() == Level.WARN && ("Configuration " + + "yarn.app.mapreduce.am.resource." + memoryName + "=3Gi is " + + "overriding the yarn.app.mapreduce.am.resource.mb=2048 " + + "configuration").equals(e.getMessage())) { + foundLogWarning = true; + break; + } + } + assertTrue(foundLogWarning); + } finally { + logger.removeAppender(testAppender); + } + } + } + + private void initResourceTypes() { + Configuration configuration = new Configuration(); + configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, + CustomResourceTypesConfigurationProvider.class.getName()); + ResourceUtils.resetResourceTypes(configuration); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java index 65eb5a21c30..3806771acb5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java @@ -44,7 +44,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI; @@ -60,6 +63,8 @@ public class ResourceUtils { private static final String MEMORY = ResourceInformation.MEMORY_MB.getName(); private static final String VCORES = ResourceInformation.VCORES.getName(); + private static final Pattern RESOURCE_REQUEST_VALUE_PATTERN = + Pattern.compile("^([0-9]+) ?([a-zA-Z]*)$"); private static volatile boolean initializedResources = false; private static final Map RESOURCE_NAME_TO_INDEX = @@ -564,4 +569,43 @@ public static List getResourcesTypeInfo() { } return array; } + + /** + * From a given configuration get all entries representing requested + * resources: entries that match the {prefix}{resourceName}={value}[{units}] + * pattern. + * @param configuration The configuration + * @param prefix Keys with this prefix are considered from the configuration + * @return The list of requested resources as described by the configuration + */ + public static List getRequestedResourcesFromConfig( + Configuration configuration, String prefix) { + List result = new ArrayList<>(); + Map customResourcesMap = configuration + .getValByRegex("^" + Pattern.quote(prefix) + "[^.]+$"); + for (Entry resource : customResourcesMap.entrySet()) { + String resourceName = resource.getKey().substring(prefix.length()); + Matcher matcher = + RESOURCE_REQUEST_VALUE_PATTERN.matcher(resource.getValue()); + if (!matcher.matches()) { + String errorMsg = "Invalid resource request specified for property " + + resource.getKey() + ": \"" + resource.getValue() + + "\", expected format is: value[ ][units]"; + LOG.error(errorMsg); + throw new IllegalArgumentException(errorMsg); + } + long value = Long.parseLong(matcher.group(1)); + String unit = matcher.group(2); + if (unit.isEmpty()) { + unit = ResourceUtils.getDefaultUnit(resourceName); + } + ResourceInformation resourceInformation = new ResourceInformation(); + resourceInformation.setName(resourceName); + resourceInformation.setValue(value); + resourceInformation.setUnits(unit); + result.add(resourceInformation); + } + return result; + } + }