MAPREDUCE-7309. Improve performance of reading resource request for mapper/reducers from config. Contributed by Peter Bacsko & Wangda Tan.

This commit is contained in:
Peter Bacsko 2020-11-25 11:46:00 +01:00
parent 4638ed94db
commit 7d60217754
2 changed files with 18 additions and 1 deletions

View File

@ -158,6 +158,9 @@ public abstract class TaskAttemptImpl implements
org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt,
EventHandler<TaskAttemptEvent> {
@VisibleForTesting
protected final static Map<TaskType, Resource> RESOURCE_REQUEST_CACHE
= new HashMap<>();
static final Counters EMPTY_COUNTERS = new Counters();
private static final Logger LOG =
LoggerFactory.getLogger(TaskAttemptImpl.class);
@ -172,7 +175,7 @@ public abstract class TaskAttemptImpl implements
private final Clock clock;
private final org.apache.hadoop.mapred.JobID oldJobId;
private final TaskAttemptListener taskAttemptListener;
private final Resource resourceCapability;
private Resource resourceCapability;
protected Set<String> dataLocalHosts;
protected Set<String> dataLocalRacks;
private final List<String> diagnostics = new ArrayList<String>();
@ -707,6 +710,10 @@ public abstract class TaskAttemptImpl implements
getResourceTypePrefix(taskType);
boolean memorySet = false;
boolean cpuVcoresSet = false;
if (RESOURCE_REQUEST_CACHE.get(taskType) != null) {
resourceCapability = RESOURCE_REQUEST_CACHE.get(taskType);
return;
}
if (resourceTypePrefix != null) {
List<ResourceInformation> resourceRequests =
ResourceUtils.getRequestedResourcesFromConfig(conf,
@ -767,6 +774,9 @@ public abstract class TaskAttemptImpl implements
if (!cpuVcoresSet) {
this.resourceCapability.setVirtualCores(getCpuRequired(conf, taskType));
}
RESOURCE_REQUEST_CACHE.put(taskType, resourceCapability);
LOG.info("Resource capability of task type {} is set to {}",
taskType, resourceCapability);
}
private String getCpuVcoresKey(TaskType taskType) {

View File

@ -42,6 +42,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@ -180,6 +181,11 @@ public class TestTaskAttempt{
ResourceUtils.resetResourceTypes(new Configuration());
}
@Before
public void before() {
TaskAttemptImpl.RESOURCE_REQUEST_CACHE.clear();
}
@After
public void tearDown() {
ResourceUtils.resetResourceTypes(new Configuration());
@ -1634,6 +1640,7 @@ public class TestTaskAttempt{
TestAppender testAppender = new TestAppender();
final Logger logger = Logger.getLogger(TaskAttemptImpl.class);
try {
TaskAttemptImpl.RESOURCE_REQUEST_CACHE.clear();
logger.addAppender(testAppender);
EventHandler eventHandler = mock(EventHandler.class);
Clock clock = SystemClock.getInstance();