YARN-6927. Add support for individual resource types requests in MapReduce
(Contributed by Gergo Repas via Daniel Templeton)
(cherry picked from commit 9a7e810838
)
This commit is contained in:
parent
77401022a7
commit
7689d4781a
|
@ -18,6 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.mapreduce.v2.app.job.impl;
|
package org.apache.hadoop.mapreduce.v2.app.job.impl;
|
||||||
|
|
||||||
|
import static org.apache.commons.lang.StringUtils.isEmpty;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
@ -123,6 +125,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||||
import org.apache.hadoop.yarn.api.records.URL;
|
import org.apache.hadoop.yarn.api.records.URL;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
@ -136,6 +139,8 @@ import org.apache.hadoop.yarn.state.StateMachine;
|
||||||
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
||||||
import org.apache.hadoop.yarn.util.Clock;
|
import org.apache.hadoop.yarn.util.Clock;
|
||||||
import org.apache.hadoop.yarn.util.RackResolver;
|
import org.apache.hadoop.yarn.util.RackResolver;
|
||||||
|
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
@ -667,12 +672,8 @@ public abstract class TaskAttemptImpl implements
|
||||||
this.jobFile = jobFile;
|
this.jobFile = jobFile;
|
||||||
this.partition = partition;
|
this.partition = partition;
|
||||||
|
|
||||||
//TODO:create the resource reqt for this Task attempt
|
|
||||||
this.resourceCapability = recordFactory.newRecordInstance(Resource.class);
|
this.resourceCapability = recordFactory.newRecordInstance(Resource.class);
|
||||||
this.resourceCapability.setMemorySize(
|
populateResourceCapability(taskId.getTaskType());
|
||||||
getMemoryRequired(conf, taskId.getTaskType()));
|
|
||||||
this.resourceCapability.setVirtualCores(
|
|
||||||
getCpuRequired(conf, taskId.getTaskType()));
|
|
||||||
|
|
||||||
this.dataLocalHosts = resolveHosts(dataLocalHosts);
|
this.dataLocalHosts = resolveHosts(dataLocalHosts);
|
||||||
RackResolver.init(conf);
|
RackResolver.init(conf);
|
||||||
|
@ -689,25 +690,137 @@ public abstract class TaskAttemptImpl implements
|
||||||
stateMachine = stateMachineFactory.make(this);
|
stateMachine = stateMachineFactory.make(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void populateResourceCapability(TaskType taskType) {
|
||||||
|
String resourceTypePrefix =
|
||||||
|
getResourceTypePrefix(taskType);
|
||||||
|
boolean memorySet = false;
|
||||||
|
boolean cpuVcoresSet = false;
|
||||||
|
if (resourceTypePrefix != null) {
|
||||||
|
List<ResourceInformation> resourceRequests =
|
||||||
|
ResourceUtils.getRequestedResourcesFromConfig(conf,
|
||||||
|
resourceTypePrefix);
|
||||||
|
for (ResourceInformation resourceRequest : resourceRequests) {
|
||||||
|
String resourceName = resourceRequest.getName();
|
||||||
|
if (MRJobConfig.RESOURCE_TYPE_NAME_MEMORY.equals(resourceName) ||
|
||||||
|
MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY.equals(
|
||||||
|
resourceName)) {
|
||||||
|
if (memorySet) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Only one of the following keys " +
|
||||||
|
"can be specified for a single job: " +
|
||||||
|
MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY + ", " +
|
||||||
|
MRJobConfig.RESOURCE_TYPE_NAME_MEMORY);
|
||||||
|
}
|
||||||
|
String units = isEmpty(resourceRequest.getUnits()) ?
|
||||||
|
ResourceUtils.getDefaultUnit(ResourceInformation.MEMORY_URI) :
|
||||||
|
resourceRequest.getUnits();
|
||||||
|
this.resourceCapability.setMemorySize(
|
||||||
|
UnitsConversionUtil.convert(units, "Mi",
|
||||||
|
resourceRequest.getValue()));
|
||||||
|
memorySet = true;
|
||||||
|
String memoryKey = getMemoryKey(taskType);
|
||||||
|
if (memoryKey != null && conf.get(memoryKey) != null) {
|
||||||
|
LOG.warn("Configuration " + resourceTypePrefix + resourceName +
|
||||||
|
"=" + resourceRequest.getValue() + resourceRequest.getUnits() +
|
||||||
|
" is overriding the " + memoryKey + "=" + conf.get(memoryKey) +
|
||||||
|
" configuration");
|
||||||
|
}
|
||||||
|
} else if (MRJobConfig.RESOURCE_TYPE_NAME_VCORE.equals(
|
||||||
|
resourceName)) {
|
||||||
|
this.resourceCapability.setVirtualCores(
|
||||||
|
(int) UnitsConversionUtil.convert(resourceRequest.getUnits(), "",
|
||||||
|
resourceRequest.getValue()));
|
||||||
|
cpuVcoresSet = true;
|
||||||
|
String cpuKey = getCpuVcoresKey(taskType);
|
||||||
|
if (cpuKey != null && conf.get(cpuKey) != null) {
|
||||||
|
LOG.warn("Configuration " + resourceTypePrefix +
|
||||||
|
MRJobConfig.RESOURCE_TYPE_NAME_VCORE + "=" +
|
||||||
|
resourceRequest.getValue() + resourceRequest.getUnits() +
|
||||||
|
" is overriding the " + cpuKey + "=" +
|
||||||
|
conf.get(cpuKey) + " configuration");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ResourceInformation resourceInformation =
|
||||||
|
this.resourceCapability.getResourceInformation(resourceName);
|
||||||
|
resourceInformation.setUnits(resourceRequest.getUnits());
|
||||||
|
resourceInformation.setValue(resourceRequest.getValue());
|
||||||
|
this.resourceCapability.setResourceInformation(resourceName,
|
||||||
|
resourceInformation);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!memorySet) {
|
||||||
|
this.resourceCapability.setMemorySize(getMemoryRequired(conf, taskType));
|
||||||
|
}
|
||||||
|
if (!cpuVcoresSet) {
|
||||||
|
this.resourceCapability.setVirtualCores(getCpuRequired(conf, taskType));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getCpuVcoresKey(TaskType taskType) {
|
||||||
|
switch (taskType) {
|
||||||
|
case MAP:
|
||||||
|
return MRJobConfig.MAP_CPU_VCORES;
|
||||||
|
case REDUCE:
|
||||||
|
return MRJobConfig.REDUCE_CPU_VCORES;
|
||||||
|
default:
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getMemoryKey(TaskType taskType) {
|
||||||
|
switch (taskType) {
|
||||||
|
case MAP:
|
||||||
|
return MRJobConfig.MAP_MEMORY_MB;
|
||||||
|
case REDUCE:
|
||||||
|
return MRJobConfig.REDUCE_MEMORY_MB;
|
||||||
|
default:
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Integer getCpuVcoreDefault(TaskType taskType) {
|
||||||
|
switch (taskType) {
|
||||||
|
case MAP:
|
||||||
|
return MRJobConfig.DEFAULT_MAP_CPU_VCORES;
|
||||||
|
case REDUCE:
|
||||||
|
return MRJobConfig.DEFAULT_REDUCE_CPU_VCORES;
|
||||||
|
default:
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private int getMemoryRequired(JobConf conf, TaskType taskType) {
|
private int getMemoryRequired(JobConf conf, TaskType taskType) {
|
||||||
return conf.getMemoryRequired(TypeConverter.fromYarn(taskType));
|
return conf.getMemoryRequired(TypeConverter.fromYarn(taskType));
|
||||||
}
|
}
|
||||||
|
|
||||||
private int getCpuRequired(Configuration conf, TaskType taskType) {
|
private int getCpuRequired(Configuration conf, TaskType taskType) {
|
||||||
int vcores = 1;
|
int vcores = 1;
|
||||||
if (taskType == TaskType.MAP) {
|
String cpuVcoreKey = getCpuVcoresKey(taskType);
|
||||||
vcores =
|
if (cpuVcoreKey != null) {
|
||||||
conf.getInt(MRJobConfig.MAP_CPU_VCORES,
|
Integer defaultCpuVcores = getCpuVcoreDefault(taskType);
|
||||||
MRJobConfig.DEFAULT_MAP_CPU_VCORES);
|
if (null == defaultCpuVcores) {
|
||||||
} else if (taskType == TaskType.REDUCE) {
|
defaultCpuVcores = vcores;
|
||||||
vcores =
|
}
|
||||||
conf.getInt(MRJobConfig.REDUCE_CPU_VCORES,
|
vcores = conf.getInt(cpuVcoreKey, defaultCpuVcores);
|
||||||
MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return vcores;
|
return vcores;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String getResourceTypePrefix(TaskType taskType) {
|
||||||
|
switch (taskType) {
|
||||||
|
case MAP:
|
||||||
|
return MRJobConfig.MAP_RESOURCE_TYPE_PREFIX;
|
||||||
|
case REDUCE:
|
||||||
|
return MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX;
|
||||||
|
default:
|
||||||
|
LOG.info("TaskType " + taskType +
|
||||||
|
" does not support custom resource types - this support can be " +
|
||||||
|
"added in " + getClass().getSimpleName());
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a {@link LocalResource} record with all the given parameters.
|
* Create a {@link LocalResource} record with all the given parameters.
|
||||||
* The NM that hosts AM container will upload resources to shared cache.
|
* The NM that hosts AM container will upload resources to shared cache.
|
||||||
|
|
|
@ -71,6 +71,17 @@ public class TestMapreduceConfigFields extends TestConfigurationFieldsBase {
|
||||||
.add(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY);
|
.add(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY);
|
||||||
configurationPropsToSkipCompare
|
configurationPropsToSkipCompare
|
||||||
.add(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY);
|
.add(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY);
|
||||||
|
|
||||||
|
// Resource type related properties are only prefixes,
|
||||||
|
// they need to be postfixed with the resource name
|
||||||
|
// in order to take effect.
|
||||||
|
// There is nothing to be added to mapred-default.xml
|
||||||
|
configurationPropsToSkipCompare.add(
|
||||||
|
MRJobConfig.MR_AM_RESOURCE_PREFIX);
|
||||||
|
configurationPropsToSkipCompare.add(
|
||||||
|
MRJobConfig.MAP_RESOURCE_TYPE_PREFIX);
|
||||||
|
configurationPropsToSkipCompare.add(
|
||||||
|
MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,13 +28,20 @@ import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
|
||||||
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -42,6 +49,7 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.RawLocalFileSystem;
|
import org.apache.hadoop.fs.RawLocalFileSystem;
|
||||||
import org.apache.hadoop.mapred.JobConf;
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
import org.apache.hadoop.mapred.MapTaskAttemptImpl;
|
import org.apache.hadoop.mapred.MapTaskAttemptImpl;
|
||||||
|
import org.apache.hadoop.mapred.ReduceTaskAttemptImpl;
|
||||||
import org.apache.hadoop.mapreduce.Counters;
|
import org.apache.hadoop.mapreduce.Counters;
|
||||||
import org.apache.hadoop.mapreduce.JobCounter;
|
import org.apache.hadoop.mapreduce.JobCounter;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
|
@ -82,24 +90,36 @@ import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.yarn.LocalConfigurationProvider;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.Event;
|
import org.apache.hadoop.yarn.event.Event;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.util.Clock;
|
import org.apache.hadoop.yarn.util.Clock;
|
||||||
import org.apache.hadoop.yarn.util.ControlledClock;
|
import org.apache.hadoop.yarn.util.ControlledClock;
|
||||||
import org.apache.hadoop.yarn.util.SystemClock;
|
import org.apache.hadoop.yarn.util.SystemClock;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
|
import org.apache.log4j.AppenderSkeleton;
|
||||||
|
import org.apache.log4j.Level;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
import org.apache.log4j.spi.LoggingEvent;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.ArgumentCaptor;
|
import org.mockito.ArgumentCaptor;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
|
||||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||||
public class TestTaskAttempt{
|
public class TestTaskAttempt{
|
||||||
|
|
||||||
|
private static final String CUSTOM_RESOURCE_NAME = "a-custom-resource";
|
||||||
|
|
||||||
static public class StubbedFS extends RawLocalFileSystem {
|
static public class StubbedFS extends RawLocalFileSystem {
|
||||||
@Override
|
@Override
|
||||||
public FileStatus getFileStatus(Path f) throws IOException {
|
public FileStatus getFileStatus(Path f) throws IOException {
|
||||||
|
@ -107,6 +127,63 @@ public class TestTaskAttempt{
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class CustomResourceTypesConfigurationProvider
|
||||||
|
extends LocalConfigurationProvider {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public InputStream getConfigurationInputStream(Configuration bootstrapConf,
|
||||||
|
String name) throws YarnException, IOException {
|
||||||
|
if (YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE.equals(name)) {
|
||||||
|
return new ByteArrayInputStream(
|
||||||
|
("<configuration>\n" +
|
||||||
|
" <property>\n" +
|
||||||
|
" <name>yarn.resource-types</name>\n" +
|
||||||
|
" <value>a-custom-resource</value>\n" +
|
||||||
|
" </property>\n" +
|
||||||
|
" <property>\n" +
|
||||||
|
" <name>yarn.resource-types.a-custom-resource.units</name>\n" +
|
||||||
|
" <value>G</value>\n" +
|
||||||
|
" </property>\n" +
|
||||||
|
"</configuration>\n").getBytes());
|
||||||
|
} else {
|
||||||
|
return super.getConfigurationInputStream(bootstrapConf, name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class TestAppender extends AppenderSkeleton {
|
||||||
|
|
||||||
|
private final List<LoggingEvent> logEvents = new CopyOnWriteArrayList<>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean requiresLayout() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void append(LoggingEvent arg0) {
|
||||||
|
logEvents.add(arg0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<LoggingEvent> getLogEvents() {
|
||||||
|
return logEvents;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupBeforeClass() {
|
||||||
|
ResourceUtils.resetResourceTypes(new Configuration());
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() {
|
||||||
|
ResourceUtils.resetResourceTypes(new Configuration());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMRAppHistoryForMap() throws Exception {
|
public void testMRAppHistoryForMap() throws Exception {
|
||||||
MRApp app = new FailingAttemptsMRApp(1, 0);
|
MRApp app = new FailingAttemptsMRApp(1, 0);
|
||||||
|
@ -328,17 +405,18 @@ public class TestTaskAttempt{
|
||||||
private TaskAttemptImpl createMapTaskAttemptImplForTest(
|
private TaskAttemptImpl createMapTaskAttemptImplForTest(
|
||||||
EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo) {
|
EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo) {
|
||||||
Clock clock = SystemClock.getInstance();
|
Clock clock = SystemClock.getInstance();
|
||||||
return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo, clock);
|
return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo,
|
||||||
|
clock, new JobConf());
|
||||||
}
|
}
|
||||||
|
|
||||||
private TaskAttemptImpl createMapTaskAttemptImplForTest(
|
private TaskAttemptImpl createMapTaskAttemptImplForTest(
|
||||||
EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) {
|
EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo,
|
||||||
|
Clock clock, JobConf jobConf) {
|
||||||
ApplicationId appId = ApplicationId.newInstance(1, 1);
|
ApplicationId appId = ApplicationId.newInstance(1, 1);
|
||||||
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
|
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
|
||||||
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
|
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
|
||||||
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
|
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
|
||||||
Path jobFile = mock(Path.class);
|
Path jobFile = mock(Path.class);
|
||||||
JobConf jobConf = new JobConf();
|
|
||||||
TaskAttemptImpl taImpl =
|
TaskAttemptImpl taImpl =
|
||||||
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
|
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
|
||||||
taskSplitMetaInfo, jobConf, taListener, null,
|
taskSplitMetaInfo, jobConf, taListener, null,
|
||||||
|
@ -346,6 +424,20 @@ public class TestTaskAttempt{
|
||||||
return taImpl;
|
return taImpl;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private TaskAttemptImpl createReduceTaskAttemptImplForTest(
|
||||||
|
EventHandler eventHandler, Clock clock, JobConf jobConf) {
|
||||||
|
ApplicationId appId = ApplicationId.newInstance(1, 1);
|
||||||
|
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
|
||||||
|
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.REDUCE);
|
||||||
|
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
|
||||||
|
Path jobFile = mock(Path.class);
|
||||||
|
TaskAttemptImpl taImpl =
|
||||||
|
new ReduceTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
|
||||||
|
1, jobConf, taListener, null,
|
||||||
|
null, clock, null);
|
||||||
|
return taImpl;
|
||||||
|
}
|
||||||
|
|
||||||
private void testMRAppHistory(MRApp app) throws Exception {
|
private void testMRAppHistory(MRApp app) throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
Job job = app.submit(conf);
|
Job job = app.submit(conf);
|
||||||
|
@ -1412,6 +1504,259 @@ public class TestTaskAttempt{
|
||||||
assertFalse("InternalError occurred", eventHandler.internalError);
|
assertFalse("InternalError occurred", eventHandler.internalError);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMapperCustomResourceTypes() {
|
||||||
|
initResourceTypes();
|
||||||
|
EventHandler eventHandler = mock(EventHandler.class);
|
||||||
|
TaskSplitMetaInfo taskSplitMetaInfo = new TaskSplitMetaInfo();
|
||||||
|
Clock clock = SystemClock.getInstance();
|
||||||
|
JobConf jobConf = new JobConf();
|
||||||
|
jobConf.setLong(MRJobConfig.MAP_RESOURCE_TYPE_PREFIX
|
||||||
|
+ CUSTOM_RESOURCE_NAME, 7L);
|
||||||
|
TaskAttemptImpl taImpl = createMapTaskAttemptImplForTest(eventHandler,
|
||||||
|
taskSplitMetaInfo, clock, jobConf);
|
||||||
|
ResourceInformation resourceInfo =
|
||||||
|
getResourceInfoFromContainerRequest(taImpl, eventHandler).
|
||||||
|
getResourceInformation(CUSTOM_RESOURCE_NAME);
|
||||||
|
assertEquals("Expecting the default unit (G)",
|
||||||
|
"G", resourceInfo.getUnits());
|
||||||
|
assertEquals(7L, resourceInfo.getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReducerCustomResourceTypes() {
|
||||||
|
initResourceTypes();
|
||||||
|
EventHandler eventHandler = mock(EventHandler.class);
|
||||||
|
Clock clock = SystemClock.getInstance();
|
||||||
|
JobConf jobConf = new JobConf();
|
||||||
|
jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX
|
||||||
|
+ CUSTOM_RESOURCE_NAME, "3m");
|
||||||
|
TaskAttemptImpl taImpl =
|
||||||
|
createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
|
||||||
|
ResourceInformation resourceInfo =
|
||||||
|
getResourceInfoFromContainerRequest(taImpl, eventHandler).
|
||||||
|
getResourceInformation(CUSTOM_RESOURCE_NAME);
|
||||||
|
assertEquals("Expecting the specified unit (m)",
|
||||||
|
"m", resourceInfo.getUnits());
|
||||||
|
assertEquals(3L, resourceInfo.getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReducerMemoryRequestViaMapreduceReduceMemoryMb() {
|
||||||
|
EventHandler eventHandler = mock(EventHandler.class);
|
||||||
|
Clock clock = SystemClock.getInstance();
|
||||||
|
JobConf jobConf = new JobConf();
|
||||||
|
jobConf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 2048);
|
||||||
|
TaskAttemptImpl taImpl =
|
||||||
|
createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
|
||||||
|
long memorySize =
|
||||||
|
getResourceInfoFromContainerRequest(taImpl, eventHandler).
|
||||||
|
getMemorySize();
|
||||||
|
assertEquals(2048, memorySize);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReducerMemoryRequestViaMapreduceReduceResourceMemory() {
|
||||||
|
EventHandler eventHandler = mock(EventHandler.class);
|
||||||
|
Clock clock = SystemClock.getInstance();
|
||||||
|
JobConf jobConf = new JobConf();
|
||||||
|
jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX +
|
||||||
|
MRJobConfig.RESOURCE_TYPE_NAME_MEMORY, "2 Gi");
|
||||||
|
TaskAttemptImpl taImpl =
|
||||||
|
createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
|
||||||
|
long memorySize =
|
||||||
|
getResourceInfoFromContainerRequest(taImpl, eventHandler).
|
||||||
|
getMemorySize();
|
||||||
|
assertEquals(2048, memorySize);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReducerMemoryRequestDefaultMemory() {
|
||||||
|
EventHandler eventHandler = mock(EventHandler.class);
|
||||||
|
Clock clock = SystemClock.getInstance();
|
||||||
|
TaskAttemptImpl taImpl =
|
||||||
|
createReduceTaskAttemptImplForTest(eventHandler, clock, new JobConf());
|
||||||
|
long memorySize =
|
||||||
|
getResourceInfoFromContainerRequest(taImpl, eventHandler).
|
||||||
|
getMemorySize();
|
||||||
|
assertEquals(MRJobConfig.DEFAULT_REDUCE_MEMORY_MB, memorySize);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReducerMemoryRequestWithoutUnits() {
|
||||||
|
Clock clock = SystemClock.getInstance();
|
||||||
|
for (String memoryResourceName : ImmutableList.of(
|
||||||
|
MRJobConfig.RESOURCE_TYPE_NAME_MEMORY,
|
||||||
|
MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) {
|
||||||
|
EventHandler eventHandler = mock(EventHandler.class);
|
||||||
|
JobConf jobConf = new JobConf();
|
||||||
|
jobConf.setInt(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX +
|
||||||
|
memoryResourceName, 2048);
|
||||||
|
TaskAttemptImpl taImpl =
|
||||||
|
createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
|
||||||
|
long memorySize =
|
||||||
|
getResourceInfoFromContainerRequest(taImpl, eventHandler).
|
||||||
|
getMemorySize();
|
||||||
|
assertEquals(2048, memorySize);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReducerMemoryRequestOverriding() {
|
||||||
|
for (String memoryName : ImmutableList.of(
|
||||||
|
MRJobConfig.RESOURCE_TYPE_NAME_MEMORY,
|
||||||
|
MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) {
|
||||||
|
TestAppender testAppender = new TestAppender();
|
||||||
|
final Logger logger = Logger.getLogger(TaskAttemptImpl.class);
|
||||||
|
try {
|
||||||
|
logger.addAppender(testAppender);
|
||||||
|
EventHandler eventHandler = mock(EventHandler.class);
|
||||||
|
Clock clock = SystemClock.getInstance();
|
||||||
|
JobConf jobConf = new JobConf();
|
||||||
|
jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX + memoryName,
|
||||||
|
"3Gi");
|
||||||
|
jobConf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 2048);
|
||||||
|
TaskAttemptImpl taImpl =
|
||||||
|
createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
|
||||||
|
long memorySize =
|
||||||
|
getResourceInfoFromContainerRequest(taImpl, eventHandler).
|
||||||
|
getMemorySize();
|
||||||
|
assertEquals(3072, memorySize);
|
||||||
|
assertTrue(testAppender.getLogEvents().stream()
|
||||||
|
.anyMatch(e -> e.getLevel() == Level.WARN && ("Configuration " +
|
||||||
|
"mapreduce.reduce.resource." + memoryName + "=3Gi is " +
|
||||||
|
"overriding the mapreduce.reduce.memory.mb=2048 configuration")
|
||||||
|
.equals(e.getMessage())));
|
||||||
|
} finally {
|
||||||
|
logger.removeAppender(testAppender);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected=IllegalArgumentException.class)
|
||||||
|
public void testReducerMemoryRequestMultipleName() {
|
||||||
|
EventHandler eventHandler = mock(EventHandler.class);
|
||||||
|
Clock clock = SystemClock.getInstance();
|
||||||
|
JobConf jobConf = new JobConf();
|
||||||
|
for (String memoryName : ImmutableList.of(
|
||||||
|
MRJobConfig.RESOURCE_TYPE_NAME_MEMORY,
|
||||||
|
MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) {
|
||||||
|
jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX + memoryName,
|
||||||
|
"3Gi");
|
||||||
|
}
|
||||||
|
createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReducerCpuRequestViaMapreduceReduceCpuVcores() {
|
||||||
|
EventHandler eventHandler = mock(EventHandler.class);
|
||||||
|
Clock clock = SystemClock.getInstance();
|
||||||
|
JobConf jobConf = new JobConf();
|
||||||
|
jobConf.setInt(MRJobConfig.REDUCE_CPU_VCORES, 3);
|
||||||
|
TaskAttemptImpl taImpl =
|
||||||
|
createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
|
||||||
|
int vCores =
|
||||||
|
getResourceInfoFromContainerRequest(taImpl, eventHandler).
|
||||||
|
getVirtualCores();
|
||||||
|
assertEquals(3, vCores);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReducerCpuRequestViaMapreduceReduceResourceVcores() {
|
||||||
|
EventHandler eventHandler = mock(EventHandler.class);
|
||||||
|
Clock clock = SystemClock.getInstance();
|
||||||
|
JobConf jobConf = new JobConf();
|
||||||
|
jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX +
|
||||||
|
MRJobConfig.RESOURCE_TYPE_NAME_VCORE, "5");
|
||||||
|
TaskAttemptImpl taImpl =
|
||||||
|
createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
|
||||||
|
int vCores =
|
||||||
|
getResourceInfoFromContainerRequest(taImpl, eventHandler).
|
||||||
|
getVirtualCores();
|
||||||
|
assertEquals(5, vCores);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReducerCpuRequestDefaultMemory() {
|
||||||
|
EventHandler eventHandler = mock(EventHandler.class);
|
||||||
|
Clock clock = SystemClock.getInstance();
|
||||||
|
TaskAttemptImpl taImpl =
|
||||||
|
createReduceTaskAttemptImplForTest(eventHandler, clock, new JobConf());
|
||||||
|
int vCores =
|
||||||
|
getResourceInfoFromContainerRequest(taImpl, eventHandler).
|
||||||
|
getVirtualCores();
|
||||||
|
assertEquals(MRJobConfig.DEFAULT_REDUCE_CPU_VCORES, vCores);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReducerCpuRequestOverriding() {
|
||||||
|
TestAppender testAppender = new TestAppender();
|
||||||
|
final Logger logger = Logger.getLogger(TaskAttemptImpl.class);
|
||||||
|
try {
|
||||||
|
logger.addAppender(testAppender);
|
||||||
|
EventHandler eventHandler = mock(EventHandler.class);
|
||||||
|
Clock clock = SystemClock.getInstance();
|
||||||
|
JobConf jobConf = new JobConf();
|
||||||
|
jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX +
|
||||||
|
MRJobConfig.RESOURCE_TYPE_NAME_VCORE, "7");
|
||||||
|
jobConf.setInt(MRJobConfig.REDUCE_CPU_VCORES, 9);
|
||||||
|
TaskAttemptImpl taImpl =
|
||||||
|
createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
|
||||||
|
long vCores =
|
||||||
|
getResourceInfoFromContainerRequest(taImpl, eventHandler).
|
||||||
|
getVirtualCores();
|
||||||
|
assertEquals(7, vCores);
|
||||||
|
assertTrue(testAppender.getLogEvents().stream().anyMatch(
|
||||||
|
e -> e.getLevel() == Level.WARN && ("Configuration " +
|
||||||
|
"mapreduce.reduce.resource.vcores=7 is overriding the " +
|
||||||
|
"mapreduce.reduce.cpu.vcores=9 configuration").equals(
|
||||||
|
e.getMessage())));
|
||||||
|
} finally {
|
||||||
|
logger.removeAppender(testAppender);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Resource getResourceInfoFromContainerRequest(
|
||||||
|
TaskAttemptImpl taImpl, EventHandler eventHandler) {
|
||||||
|
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
|
||||||
|
TaskAttemptEventType.TA_SCHEDULE));
|
||||||
|
|
||||||
|
assertEquals("Task attempt is not in STARTING state", taImpl.getState(),
|
||||||
|
TaskAttemptState.STARTING);
|
||||||
|
|
||||||
|
ArgumentCaptor<Event> captor = ArgumentCaptor.forClass(Event.class);
|
||||||
|
verify(eventHandler, times(2)).handle(captor.capture());
|
||||||
|
|
||||||
|
List<ContainerRequestEvent> containerRequestEvents = new ArrayList<>();
|
||||||
|
for (Event e : captor.getAllValues()) {
|
||||||
|
if (e instanceof ContainerRequestEvent) {
|
||||||
|
containerRequestEvents.add((ContainerRequestEvent) e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertEquals("Expected one ContainerRequestEvent after scheduling "
|
||||||
|
+ "task attempt", 1, containerRequestEvents.size());
|
||||||
|
|
||||||
|
return containerRequestEvents.get(0).getCapability();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected=IllegalArgumentException.class)
|
||||||
|
public void testReducerCustomResourceTypeWithInvalidUnit() {
|
||||||
|
initResourceTypes();
|
||||||
|
EventHandler eventHandler = mock(EventHandler.class);
|
||||||
|
Clock clock = SystemClock.getInstance();
|
||||||
|
JobConf jobConf = new JobConf();
|
||||||
|
jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX
|
||||||
|
+ CUSTOM_RESOURCE_NAME, "3z");
|
||||||
|
createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initResourceTypes() {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
|
||||||
|
CustomResourceTypesConfigurationProvider.class.getName());
|
||||||
|
ResourceUtils.resetResourceTypes(conf);
|
||||||
|
}
|
||||||
|
|
||||||
private void setupTaskAttemptFinishingMonitor(
|
private void setupTaskAttemptFinishingMonitor(
|
||||||
EventHandler eventHandler, JobConf jobConf, AppContext appCtx) {
|
EventHandler eventHandler, JobConf jobConf, AppContext appCtx) {
|
||||||
TaskAttemptFinishingMonitor taskAttemptFinishingMonitor =
|
TaskAttemptFinishingMonitor taskAttemptFinishingMonitor =
|
||||||
|
|
|
@ -363,12 +363,47 @@ public interface MRJobConfig {
|
||||||
|
|
||||||
public static final String MAP_INPUT_START = "mapreduce.map.input.start";
|
public static final String MAP_INPUT_START = "mapreduce.map.input.start";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configuration key for specifying memory requirement for the mapper.
|
||||||
|
* Kept for backward-compatibility, mapreduce.map.resource.memory
|
||||||
|
* is the new preferred way to specify this.
|
||||||
|
*/
|
||||||
public static final String MAP_MEMORY_MB = "mapreduce.map.memory.mb";
|
public static final String MAP_MEMORY_MB = "mapreduce.map.memory.mb";
|
||||||
public static final int DEFAULT_MAP_MEMORY_MB = 1024;
|
public static final int DEFAULT_MAP_MEMORY_MB = 1024;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configuration key for specifying CPU requirement for the mapper.
|
||||||
|
* Kept for backward-compatibility, mapreduce.map.resource.vcores
|
||||||
|
* is the new preferred way to specify this.
|
||||||
|
*/
|
||||||
public static final String MAP_CPU_VCORES = "mapreduce.map.cpu.vcores";
|
public static final String MAP_CPU_VCORES = "mapreduce.map.cpu.vcores";
|
||||||
public static final int DEFAULT_MAP_CPU_VCORES = 1;
|
public static final int DEFAULT_MAP_CPU_VCORES = 1;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Custom resource names required by the mapper should be
|
||||||
|
* appended to this prefix, the value's format is {amount}[ ][{unit}].
|
||||||
|
* If no unit is defined, the default unit will be used.
|
||||||
|
* Standard resource names: memory (default unit: Mi), vcores
|
||||||
|
*/
|
||||||
|
public static final String MAP_RESOURCE_TYPE_PREFIX =
|
||||||
|
"mapreduce.map.resource.";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resource type name for CPU vcores.
|
||||||
|
*/
|
||||||
|
public static final String RESOURCE_TYPE_NAME_VCORE = "vcores";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resource type name for memory.
|
||||||
|
*/
|
||||||
|
public static final String RESOURCE_TYPE_NAME_MEMORY = "memory";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Alternative resource type name for memory.
|
||||||
|
*/
|
||||||
|
public static final String RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY =
|
||||||
|
"memory-mb";
|
||||||
|
|
||||||
public static final String MAP_ENV = "mapreduce.map.env";
|
public static final String MAP_ENV = "mapreduce.map.env";
|
||||||
|
|
||||||
public static final String MAP_JAVA_OPTS = "mapreduce.map.java.opts";
|
public static final String MAP_JAVA_OPTS = "mapreduce.map.java.opts";
|
||||||
|
@ -417,12 +452,31 @@ public interface MRJobConfig {
|
||||||
|
|
||||||
public static final String REDUCE_MARKRESET_BUFFER_SIZE = "mapreduce.reduce.markreset.buffer.size";
|
public static final String REDUCE_MARKRESET_BUFFER_SIZE = "mapreduce.reduce.markreset.buffer.size";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configuration key for specifying memory requirement for the reducer.
|
||||||
|
* Kept for backward-compatibility, mapreduce.reduce.resource.memory
|
||||||
|
* is the new preferred way to specify this.
|
||||||
|
*/
|
||||||
public static final String REDUCE_MEMORY_MB = "mapreduce.reduce.memory.mb";
|
public static final String REDUCE_MEMORY_MB = "mapreduce.reduce.memory.mb";
|
||||||
public static final int DEFAULT_REDUCE_MEMORY_MB = 1024;
|
public static final int DEFAULT_REDUCE_MEMORY_MB = 1024;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configuration key for specifying CPU requirement for the reducer.
|
||||||
|
* Kept for backward-compatibility, mapreduce.reduce.resource.vcores
|
||||||
|
* is the new preferred way to specify this.
|
||||||
|
*/
|
||||||
public static final String REDUCE_CPU_VCORES = "mapreduce.reduce.cpu.vcores";
|
public static final String REDUCE_CPU_VCORES = "mapreduce.reduce.cpu.vcores";
|
||||||
public static final int DEFAULT_REDUCE_CPU_VCORES = 1;
|
public static final int DEFAULT_REDUCE_CPU_VCORES = 1;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resource names required by the reducer should be
|
||||||
|
* appended to this prefix, the value's format is {amount}[ ][{unit}].
|
||||||
|
* If no unit is defined, the default unit will be used.
|
||||||
|
* Standard resource names: memory (default unit: Mi), vcores
|
||||||
|
*/
|
||||||
|
public static final String REDUCE_RESOURCE_TYPE_PREFIX =
|
||||||
|
"mapreduce.reduce.resource.";
|
||||||
|
|
||||||
public static final String REDUCE_MEMORY_TOTAL_BYTES = "mapreduce.reduce.memory.totalbytes";
|
public static final String REDUCE_MEMORY_TOTAL_BYTES = "mapreduce.reduce.memory.totalbytes";
|
||||||
|
|
||||||
public static final String SHUFFLE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.shuffle.input.buffer.percent";
|
public static final String SHUFFLE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.shuffle.input.buffer.percent";
|
||||||
|
@ -608,7 +662,10 @@ public interface MRJobConfig {
|
||||||
public static final String DEFAULT_MR_AM_STAGING_DIR =
|
public static final String DEFAULT_MR_AM_STAGING_DIR =
|
||||||
"/tmp/hadoop-yarn/staging";
|
"/tmp/hadoop-yarn/staging";
|
||||||
|
|
||||||
/** The amount of memory the MR app master needs.*/
|
/** The amount of memory the MR app master needs.
|
||||||
|
* Kept for backward-compatibility, yarn.app.mapreduce.am.resource.memory is
|
||||||
|
* the new preferred way to specify this
|
||||||
|
*/
|
||||||
public static final String MR_AM_VMEM_MB =
|
public static final String MR_AM_VMEM_MB =
|
||||||
MR_AM_PREFIX+"resource.mb";
|
MR_AM_PREFIX+"resource.mb";
|
||||||
public static final int DEFAULT_MR_AM_VMEM_MB = 1536;
|
public static final int DEFAULT_MR_AM_VMEM_MB = 1536;
|
||||||
|
@ -618,6 +675,15 @@ public interface MRJobConfig {
|
||||||
MR_AM_PREFIX+"resource.cpu-vcores";
|
MR_AM_PREFIX+"resource.cpu-vcores";
|
||||||
public static final int DEFAULT_MR_AM_CPU_VCORES = 1;
|
public static final int DEFAULT_MR_AM_CPU_VCORES = 1;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resource names required by the MR AM should be
|
||||||
|
* appended to this prefix, the value's format is {amount}[ ][{unit}].
|
||||||
|
* If no unit is defined, the default unit will be used
|
||||||
|
* Standard resource names: memory (default unit: Mi), vcores
|
||||||
|
*/
|
||||||
|
public static final String MR_AM_RESOURCE_PREFIX =
|
||||||
|
MR_AM_PREFIX + "resource.";
|
||||||
|
|
||||||
/** Command line arguments passed to the MR app master.*/
|
/** Command line arguments passed to the MR app master.*/
|
||||||
public static final String MR_AM_COMMAND_OPTS =
|
public static final String MR_AM_COMMAND_OPTS =
|
||||||
MR_AM_PREFIX+"command-opts";
|
MR_AM_PREFIX+"command-opts";
|
||||||
|
|
|
@ -18,6 +18,9 @@
|
||||||
|
|
||||||
package org.apache.hadoop.mapred;
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
|
import static org.apache.commons.lang.StringUtils.isEmpty;
|
||||||
|
import static org.apache.hadoop.mapreduce.MRJobConfig.MR_AM_RESOURCE_PREFIX;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
|
@ -84,6 +87,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.ReservationId;
|
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.URL;
|
import org.apache.hadoop.yarn.api.records.URL;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
|
@ -93,6 +97,8 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenSelector;
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenSelector;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
@ -659,16 +665,76 @@ public class YARNRunner implements ClientProtocol {
|
||||||
|
|
||||||
private List<ResourceRequest> generateResourceRequests() throws IOException {
|
private List<ResourceRequest> generateResourceRequests() throws IOException {
|
||||||
Resource capability = recordFactory.newRecordInstance(Resource.class);
|
Resource capability = recordFactory.newRecordInstance(Resource.class);
|
||||||
capability.setMemorySize(
|
boolean memorySet = false;
|
||||||
conf.getInt(
|
boolean cpuVcoresSet = false;
|
||||||
MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB
|
List<ResourceInformation> resourceRequests = ResourceUtils
|
||||||
)
|
.getRequestedResourcesFromConfig(conf, MR_AM_RESOURCE_PREFIX);
|
||||||
);
|
for (ResourceInformation resourceReq : resourceRequests) {
|
||||||
capability.setVirtualCores(
|
String resourceName = resourceReq.getName();
|
||||||
conf.getInt(
|
if (MRJobConfig.RESOURCE_TYPE_NAME_MEMORY.equals(resourceName) ||
|
||||||
MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES
|
MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY.equals(
|
||||||
)
|
resourceName)) {
|
||||||
);
|
if (memorySet) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Only one of the following keys " +
|
||||||
|
"can be specified for a single job: " +
|
||||||
|
MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY + ", " +
|
||||||
|
MRJobConfig.RESOURCE_TYPE_NAME_MEMORY);
|
||||||
|
}
|
||||||
|
String units = isEmpty(resourceReq.getUnits()) ?
|
||||||
|
ResourceUtils.getDefaultUnit(ResourceInformation.MEMORY_URI) :
|
||||||
|
resourceReq.getUnits();
|
||||||
|
capability.setMemorySize(
|
||||||
|
UnitsConversionUtil.convert(units, "Mi", resourceReq.getValue()));
|
||||||
|
memorySet = true;
|
||||||
|
if (conf.get(MRJobConfig.MR_AM_VMEM_MB) != null) {
|
||||||
|
LOG.warn("Configuration " + MR_AM_RESOURCE_PREFIX +
|
||||||
|
resourceName + "=" + resourceReq.getValue() +
|
||||||
|
resourceReq.getUnits() + " is overriding the " +
|
||||||
|
MRJobConfig.MR_AM_VMEM_MB + "=" +
|
||||||
|
conf.get(MRJobConfig.MR_AM_VMEM_MB) + " configuration");
|
||||||
|
}
|
||||||
|
} else if (MRJobConfig.RESOURCE_TYPE_NAME_VCORE.equals(resourceName)) {
|
||||||
|
capability.setVirtualCores(
|
||||||
|
(int) UnitsConversionUtil.convert(resourceReq.getUnits(), "",
|
||||||
|
resourceReq.getValue()));
|
||||||
|
cpuVcoresSet = true;
|
||||||
|
if (conf.get(MRJobConfig.MR_AM_CPU_VCORES) != null) {
|
||||||
|
LOG.warn("Configuration " + MR_AM_RESOURCE_PREFIX +
|
||||||
|
resourceName + "=" + resourceReq.getValue() +
|
||||||
|
resourceReq.getUnits() + " is overriding the " +
|
||||||
|
MRJobConfig.MR_AM_CPU_VCORES + "=" +
|
||||||
|
conf.get(MRJobConfig.MR_AM_CPU_VCORES) + " configuration");
|
||||||
|
}
|
||||||
|
} else if (!MRJobConfig.MR_AM_VMEM_MB.equals(
|
||||||
|
MR_AM_RESOURCE_PREFIX + resourceName) &&
|
||||||
|
!MRJobConfig.MR_AM_CPU_VCORES.equals(
|
||||||
|
MR_AM_RESOURCE_PREFIX + resourceName)) {
|
||||||
|
// the "mb", "cpu-vcores" resource types are not processed here
|
||||||
|
// since the yarn.app.mapreduce.am.resource.mb,
|
||||||
|
// yarn.app.mapreduce.am.resource.cpu-vcores keys are used for
|
||||||
|
// backward-compatibility - which is handled after this loop
|
||||||
|
ResourceInformation resourceInformation = capability
|
||||||
|
.getResourceInformation(resourceName);
|
||||||
|
resourceInformation.setUnits(resourceReq.getUnits());
|
||||||
|
resourceInformation.setValue(resourceReq.getValue());
|
||||||
|
capability.setResourceInformation(resourceName, resourceInformation);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!memorySet) {
|
||||||
|
capability.setMemorySize(
|
||||||
|
conf.getInt(
|
||||||
|
MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if (!cpuVcoresSet) {
|
||||||
|
capability.setVirtualCores(
|
||||||
|
conf.getInt(
|
||||||
|
MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("AppMaster capability = " + capability);
|
LOG.debug("AppMaster capability = " + capability);
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,10 +33,12 @@ import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
@ -44,6 +46,7 @@ import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -69,6 +72,7 @@ import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.yarn.LocalConfigurationProvider;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
||||||
|
@ -96,28 +100,37 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
||||||
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
|
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
import org.apache.log4j.Appender;
|
import org.apache.log4j.Appender;
|
||||||
|
import org.apache.log4j.AppenderSkeleton;
|
||||||
import org.apache.log4j.Layout;
|
import org.apache.log4j.Layout;
|
||||||
|
import org.apache.log4j.Level;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.apache.log4j.SimpleLayout;
|
import org.apache.log4j.SimpleLayout;
|
||||||
import org.apache.log4j.WriterAppender;
|
import org.apache.log4j.WriterAppender;
|
||||||
|
import org.apache.log4j.spi.LoggingEvent;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test YarnRunner and make sure the client side plugin works
|
* Test YarnRunner and make sure the client side plugin works
|
||||||
* fine
|
* fine
|
||||||
|
@ -131,6 +144,53 @@ public class TestYARNRunner {
|
||||||
MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS.substring(0,
|
MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS.substring(0,
|
||||||
MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS.lastIndexOf("%"));
|
MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS.lastIndexOf("%"));
|
||||||
|
|
||||||
|
private static class CustomResourceTypesConfigurationProvider
|
||||||
|
extends LocalConfigurationProvider {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public InputStream getConfigurationInputStream(Configuration bootstrapConf,
|
||||||
|
String name) throws YarnException, IOException {
|
||||||
|
if (YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE.equals(name)) {
|
||||||
|
return new ByteArrayInputStream(
|
||||||
|
("<configuration>\n" +
|
||||||
|
" <property>\n" +
|
||||||
|
" <name>yarn.resource-types</name>\n" +
|
||||||
|
" <value>a-custom-resource</value>\n" +
|
||||||
|
" </property>\n" +
|
||||||
|
" <property>\n" +
|
||||||
|
" <name>yarn.resource-types.a-custom-resource.units</name>\n" +
|
||||||
|
" <value>G</value>\n" +
|
||||||
|
" </property>\n" +
|
||||||
|
"</configuration>\n").getBytes());
|
||||||
|
} else {
|
||||||
|
return super.getConfigurationInputStream(bootstrapConf, name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class TestAppender extends AppenderSkeleton {
|
||||||
|
|
||||||
|
private final List<LoggingEvent> logEvents = new CopyOnWriteArrayList<>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean requiresLayout() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void append(LoggingEvent arg0) {
|
||||||
|
logEvents.add(arg0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<LoggingEvent> getLogEvents() {
|
||||||
|
return logEvents;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private YARNRunner yarnRunner;
|
private YARNRunner yarnRunner;
|
||||||
private ResourceMgrDelegate resourceMgrDelegate;
|
private ResourceMgrDelegate resourceMgrDelegate;
|
||||||
private YarnConfiguration conf;
|
private YarnConfiguration conf;
|
||||||
|
@ -143,6 +203,11 @@ public class TestYARNRunner {
|
||||||
private ClientServiceDelegate clientDelegate;
|
private ClientServiceDelegate clientDelegate;
|
||||||
private static final String failString = "Rejected job";
|
private static final String failString = "Rejected job";
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupBeforeClass() {
|
||||||
|
ResourceUtils.resetResourceTypes(new Configuration());
|
||||||
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
resourceMgrDelegate = mock(ResourceMgrDelegate.class);
|
resourceMgrDelegate = mock(ResourceMgrDelegate.class);
|
||||||
|
@ -175,6 +240,7 @@ public class TestYARNRunner {
|
||||||
@After
|
@After
|
||||||
public void cleanup() {
|
public void cleanup() {
|
||||||
FileUtil.fullyDelete(testWorkDir);
|
FileUtil.fullyDelete(testWorkDir);
|
||||||
|
ResourceUtils.resetResourceTypes(new Configuration());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout=20000)
|
@Test(timeout=20000)
|
||||||
|
@ -881,4 +947,99 @@ public class TestYARNRunner {
|
||||||
.get("hadoop.tmp.dir").equals("testconfdir"));
|
.get("hadoop.tmp.dir").equals("testconfdir"));
|
||||||
UserGroupInformation.reset();
|
UserGroupInformation.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCustomAMRMResourceType() throws Exception {
|
||||||
|
initResourceTypes();
|
||||||
|
String customResourceName = "a-custom-resource";
|
||||||
|
|
||||||
|
JobConf jobConf = new JobConf();
|
||||||
|
|
||||||
|
jobConf.setInt(MRJobConfig.MR_AM_RESOURCE_PREFIX +
|
||||||
|
customResourceName, 5);
|
||||||
|
jobConf.setInt(MRJobConfig.MR_AM_CPU_VCORES, 3);
|
||||||
|
|
||||||
|
yarnRunner = new YARNRunner(jobConf);
|
||||||
|
|
||||||
|
submissionContext = buildSubmitContext(yarnRunner, jobConf);
|
||||||
|
|
||||||
|
List<ResourceRequest> resourceRequests =
|
||||||
|
submissionContext.getAMContainerResourceRequests();
|
||||||
|
|
||||||
|
Assert.assertEquals(1, resourceRequests.size());
|
||||||
|
ResourceRequest resourceRequest = resourceRequests.get(0);
|
||||||
|
|
||||||
|
ResourceInformation resourceInformation = resourceRequest.getCapability()
|
||||||
|
.getResourceInformation(customResourceName);
|
||||||
|
Assert.assertEquals("Expecting the default unit (G)",
|
||||||
|
"G", resourceInformation.getUnits());
|
||||||
|
Assert.assertEquals(5L, resourceInformation.getValue());
|
||||||
|
Assert.assertEquals(3, resourceRequest.getCapability().getVirtualCores());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAMRMemoryRequest() throws Exception {
|
||||||
|
for (String memoryName : ImmutableList.of(
|
||||||
|
MRJobConfig.RESOURCE_TYPE_NAME_MEMORY,
|
||||||
|
MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) {
|
||||||
|
JobConf jobConf = new JobConf();
|
||||||
|
jobConf.set(MRJobConfig.MR_AM_RESOURCE_PREFIX + memoryName, "3 Gi");
|
||||||
|
|
||||||
|
yarnRunner = new YARNRunner(jobConf);
|
||||||
|
|
||||||
|
submissionContext = buildSubmitContext(yarnRunner, jobConf);
|
||||||
|
|
||||||
|
List<ResourceRequest> resourceRequests =
|
||||||
|
submissionContext.getAMContainerResourceRequests();
|
||||||
|
|
||||||
|
Assert.assertEquals(1, resourceRequests.size());
|
||||||
|
ResourceRequest resourceRequest = resourceRequests.get(0);
|
||||||
|
|
||||||
|
long memorySize = resourceRequest.getCapability().getMemorySize();
|
||||||
|
Assert.assertEquals(3072, memorySize);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAMRMemoryRequestOverriding() throws Exception {
|
||||||
|
for (String memoryName : ImmutableList.of(
|
||||||
|
MRJobConfig.RESOURCE_TYPE_NAME_MEMORY,
|
||||||
|
MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) {
|
||||||
|
TestAppender testAppender = new TestAppender();
|
||||||
|
Logger logger = Logger.getLogger(YARNRunner.class);
|
||||||
|
logger.addAppender(testAppender);
|
||||||
|
try {
|
||||||
|
JobConf jobConf = new JobConf();
|
||||||
|
jobConf.set(MRJobConfig.MR_AM_RESOURCE_PREFIX + memoryName, "3 Gi");
|
||||||
|
jobConf.setInt(MRJobConfig.MR_AM_VMEM_MB, 2048);
|
||||||
|
|
||||||
|
yarnRunner = new YARNRunner(jobConf);
|
||||||
|
|
||||||
|
submissionContext = buildSubmitContext(yarnRunner, jobConf);
|
||||||
|
|
||||||
|
List<ResourceRequest> resourceRequests =
|
||||||
|
submissionContext.getAMContainerResourceRequests();
|
||||||
|
|
||||||
|
Assert.assertEquals(1, resourceRequests.size());
|
||||||
|
ResourceRequest resourceRequest = resourceRequests.get(0);
|
||||||
|
|
||||||
|
long memorySize = resourceRequest.getCapability().getMemorySize();
|
||||||
|
Assert.assertEquals(3072, memorySize);
|
||||||
|
assertTrue(testAppender.getLogEvents().stream().anyMatch(
|
||||||
|
e -> e.getLevel() == Level.WARN && ("Configuration " +
|
||||||
|
"yarn.app.mapreduce.am.resource." + memoryName + "=3Gi is " +
|
||||||
|
"overriding the yarn.app.mapreduce.am.resource.mb=2048 " +
|
||||||
|
"configuration").equals(e.getMessage())));
|
||||||
|
} finally {
|
||||||
|
logger.removeAppender(testAppender);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initResourceTypes() {
|
||||||
|
Configuration configuration = new Configuration();
|
||||||
|
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
|
||||||
|
CustomResourceTypesConfigurationProvider.class.getName());
|
||||||
|
ResourceUtils.resetResourceTypes(configuration);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,7 +44,10 @@ import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.regex.Matcher;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper class to read the resource-types to be supported by the system.
|
* Helper class to read the resource-types to be supported by the system.
|
||||||
|
@ -58,6 +61,8 @@ public class ResourceUtils {
|
||||||
|
|
||||||
private static final String MEMORY = ResourceInformation.MEMORY_MB.getName();
|
private static final String MEMORY = ResourceInformation.MEMORY_MB.getName();
|
||||||
private static final String VCORES = ResourceInformation.VCORES.getName();
|
private static final String VCORES = ResourceInformation.VCORES.getName();
|
||||||
|
private static final Pattern RESOURCE_REQUEST_VALUE_PATTERN =
|
||||||
|
Pattern.compile("^([0-9]+) ?([a-zA-Z]*)$");
|
||||||
|
|
||||||
private static volatile boolean initializedResources = false;
|
private static volatile boolean initializedResources = false;
|
||||||
private static final Map<String, Integer> RESOURCE_NAME_TO_INDEX =
|
private static final Map<String, Integer> RESOURCE_NAME_TO_INDEX =
|
||||||
|
@ -573,4 +578,43 @@ public class ResourceUtils {
|
||||||
}
|
}
|
||||||
return array;
|
return array;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* From a given configuration get all entries representing requested
|
||||||
|
* resources: entries that match the {prefix}{resourceName}={value}[{units}]
|
||||||
|
* pattern.
|
||||||
|
* @param configuration The configuration
|
||||||
|
* @param prefix Keys with this prefix are considered from the configuration
|
||||||
|
* @return The list of requested resources as described by the configuration
|
||||||
|
*/
|
||||||
|
public static List<ResourceInformation> getRequestedResourcesFromConfig(
|
||||||
|
Configuration configuration, String prefix) {
|
||||||
|
List<ResourceInformation> result = new ArrayList<>();
|
||||||
|
Map<String, String> customResourcesMap = configuration
|
||||||
|
.getValByRegex("^" + Pattern.quote(prefix) + "[^.]+$");
|
||||||
|
for (Entry<String, String> resource : customResourcesMap.entrySet()) {
|
||||||
|
String resourceName = resource.getKey().substring(prefix.length());
|
||||||
|
Matcher matcher =
|
||||||
|
RESOURCE_REQUEST_VALUE_PATTERN.matcher(resource.getValue());
|
||||||
|
if (!matcher.matches()) {
|
||||||
|
String errorMsg = "Invalid resource request specified for property "
|
||||||
|
+ resource.getKey() + ": \"" + resource.getValue()
|
||||||
|
+ "\", expected format is: value[ ][units]";
|
||||||
|
LOG.error(errorMsg);
|
||||||
|
throw new IllegalArgumentException(errorMsg);
|
||||||
|
}
|
||||||
|
long value = Long.parseLong(matcher.group(1));
|
||||||
|
String unit = matcher.group(2);
|
||||||
|
if (unit.isEmpty()) {
|
||||||
|
unit = ResourceUtils.getDefaultUnit(resourceName);
|
||||||
|
}
|
||||||
|
ResourceInformation resourceInformation = new ResourceInformation();
|
||||||
|
resourceInformation.setName(resourceName);
|
||||||
|
resourceInformation.setValue(value);
|
||||||
|
resourceInformation.setUnits(unit);
|
||||||
|
result.add(resourceInformation);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue