From 7d6021775489cc4c45e42e4419e794439deb7034 Mon Sep 17 00:00:00 2001 From: Peter Bacsko Date: Wed, 25 Nov 2020 11:46:00 +0100 Subject: [PATCH] MAPREDUCE-7309. Improve performance of reading resource request for mapper/reducers from config. Contributed by Peter Bacsko & Wangda Tan. --- .../mapreduce/v2/app/job/impl/TaskAttemptImpl.java | 12 +++++++++++- .../mapreduce/v2/app/job/impl/TestTaskAttempt.java | 7 +++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 6d1ac61250a..0007cdd42a8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -158,6 +158,9 @@ public abstract class TaskAttemptImpl implements org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt, EventHandler { + @VisibleForTesting + protected final static Map RESOURCE_REQUEST_CACHE + = new HashMap<>(); static final Counters EMPTY_COUNTERS = new Counters(); private static final Logger LOG = LoggerFactory.getLogger(TaskAttemptImpl.class); @@ -172,7 +175,7 @@ public abstract class TaskAttemptImpl implements private final Clock clock; private final org.apache.hadoop.mapred.JobID oldJobId; private final TaskAttemptListener taskAttemptListener; - private final Resource resourceCapability; + private Resource resourceCapability; protected Set dataLocalHosts; protected Set dataLocalRacks; private final List diagnostics = new ArrayList(); @@ -707,6 +710,10 @@ private void populateResourceCapability(TaskType taskType) { getResourceTypePrefix(taskType); boolean memorySet = false; boolean cpuVcoresSet = false; + if (RESOURCE_REQUEST_CACHE.get(taskType) != null) { + resourceCapability = RESOURCE_REQUEST_CACHE.get(taskType); + return; + } if (resourceTypePrefix != null) { List resourceRequests = ResourceUtils.getRequestedResourcesFromConfig(conf, @@ -767,6 +774,9 @@ private void populateResourceCapability(TaskType taskType) { if (!cpuVcoresSet) { this.resourceCapability.setVirtualCores(getCpuRequired(conf, taskType)); } + RESOURCE_REQUEST_CACHE.put(taskType, resourceCapability); + LOG.info("Resource capability of task type {} is set to {}", + taskType, resourceCapability); } private String getCpuVcoresKey(TaskType taskType) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index 947b1e92edc..14e2e36c6c2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -42,6 +42,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent; import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -180,6 +181,11 @@ public static void setupBeforeClass() { ResourceUtils.resetResourceTypes(new Configuration()); } + @Before + public void before() { + TaskAttemptImpl.RESOURCE_REQUEST_CACHE.clear(); + } + @After public void tearDown() { ResourceUtils.resetResourceTypes(new Configuration()); @@ -1634,6 +1640,7 @@ public void testReducerMemoryRequestOverriding() { TestAppender testAppender = new TestAppender(); final Logger logger = Logger.getLogger(TaskAttemptImpl.class); try { + TaskAttemptImpl.RESOURCE_REQUEST_CACHE.clear(); logger.addAppender(testAppender); EventHandler eventHandler = mock(EventHandler.class); Clock clock = SystemClock.getInstance();