MAPREDUCE-5002. AM could potentially allocate a reduce container to a map attempt. Contributed by Chang Li

This commit is contained in:
Jason Lowe 2015-09-17 18:17:29 +00:00
parent 58d1a02b8d
commit 3f82f582e5
3 changed files with 119 additions and 4 deletions

View File

@ -570,6 +570,9 @@ Release 2.8.0 - UNRELEASED
position/key information for uncompressed input sometimes. (Zhihai Xu via
jlowe)
MAPREDUCE-5002. AM could potentially allocate a reduce container to a map
attempt (Chang Li via jlowe)
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -1004,6 +1004,7 @@ public class RMContainerAllocator extends RMContainerRequestor
Iterator<Container> it = allocatedContainers.iterator();
LOG.info("Got allocated containers " + allocatedContainers.size());
containersAllocated += allocatedContainers.size();
int reducePending = reduces.size();
while (it.hasNext()) {
Container allocated = it.next();
if (LOG.isDebugEnabled()) {
@ -1034,13 +1035,14 @@ public class RMContainerAllocator extends RMContainerRequestor
else if (PRIORITY_REDUCE.equals(priority)) {
if (ResourceCalculatorUtils.computeAvailableContainers(allocatedResource,
reduceResourceRequest, getSchedulerResourceTypes()) <= 0
|| reduces.isEmpty()) {
|| (reducePending <= 0)) {
LOG.info("Cannot assign container " + allocated
+ " for a reduce as either "
+ " container memory less than required " + reduceResourceRequest
+ " or no pending reduce tasks - reduces.isEmpty="
+ reduces.isEmpty());
+ " or no pending reduce tasks.");
isAssignable = false;
} else {
reducePending--;
}
} else {
LOG.warn("Container allocated at unwanted priority: " + priority +

View File

@ -127,6 +127,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
@ -562,6 +563,52 @@ public class TestRMContainerAllocator {
assignedRequests.preemptionWaitingReduces.size());
}
@Test(timeout = 30000)
public void testExcessReduceContainerAssign() throws Exception {
final Configuration conf = new Configuration();
conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f);
final MyResourceManager2 rm = new MyResourceManager2(conf);
rm.start();
final DrainDispatcher dispatcher = (DrainDispatcher)rm.getRMContext()
.getDispatcher();
final RMApp app = rm.submitApp(2048);
dispatcher.await();
final String host = "host1";
final MockNM nm = rm.registerNode(String.format("%s:1234", host), 4096);
nm.nodeHeartbeat(true);
dispatcher.await();
final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
dispatcher.await();
final JobId jobId = MRBuilderUtils
.newJobId(appAttemptId.getApplicationId(), 0);
final Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
// request to allocate two reduce priority containers
final String[] locations = new String[] { host };
allocator.sendRequest(createReq(jobId, 0, 1024, locations, false, true));
allocator.scheduleAllReduces();
allocator.makeRemoteRequest();
nm.nodeHeartbeat(true);
dispatcher.await();
allocator.sendRequest(createReq(jobId, 1, 1024, locations, false, false));
int assignedContainer;
for (assignedContainer = 0; assignedContainer < 1;) {
assignedContainer += allocator.schedule().size();
nm.nodeHeartbeat(true);
dispatcher.await();
}
// only 1 allocated container should be assigned
Assert.assertEquals(assignedContainer, 1);
}
@Test
public void testMapReduceAllocationWithNodeLabelExpression() throws Exception {
@ -770,6 +817,17 @@ public class TestRMContainerAllocator {
}
}
private static class MyResourceManager2 extends MyResourceManager {
public MyResourceManager2(Configuration conf) {
super(conf);
}
@Override
protected ResourceScheduler createScheduler() {
return new ExcessReduceContainerAllocateScheduler(this.getRMContext());
}
}
@Test
public void testReportedAppProgress() throws Exception {
@ -1595,6 +1653,58 @@ public class TestRMContainerAllocator {
}
}
private static class ExcessReduceContainerAllocateScheduler extends FifoScheduler {
public ExcessReduceContainerAllocateScheduler(RMContext rmContext) {
super();
try {
Configuration conf = new Configuration();
reinitialize(conf, rmContext);
} catch (IOException ie) {
LOG.info("add application failed with ", ie);
assert (false);
}
}
@Override
public synchronized Allocation allocate(
ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
List<ContainerId> release,
List<String> blacklistAdditions, List<String> blacklistRemovals) {
List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
for (ResourceRequest req : ask) {
ResourceRequest reqCopy = ResourceRequest.newInstance(req
.getPriority(), req.getResourceName(), req.getCapability(), req
.getNumContainers(), req.getRelaxLocality());
askCopy.add(reqCopy);
}
SecurityUtil.setTokenServiceUseIp(false);
Allocation normalAlloc = super.allocate(
applicationAttemptId, askCopy, release,
blacklistAdditions, blacklistRemovals);
List<Container> containers = normalAlloc.getContainers();
if(containers.size() > 0) {
// allocate excess container
FiCaSchedulerApp application = super.getApplicationAttempt(applicationAttemptId);
ContainerId containerId = BuilderUtils.newContainerId(application
.getApplicationAttemptId(), application.getNewContainerId());
Container excessC = mock(Container.class);
when(excessC.getId()).thenReturn(containerId);
when(excessC.getPriority()).thenReturn(RMContainerAllocator.PRIORITY_REDUCE);
Resource mockR = mock(Resource.class);
when(mockR.getMemory()).thenReturn(2048);
when(excessC.getResource()).thenReturn(mockR);
NodeId nId = mock(NodeId.class);
when(nId.getHost()).thenReturn("local");
when(excessC.getNodeId()).thenReturn(nId);
containers.add(excessC);
}
Allocation excessAlloc = mock(Allocation.class);
when(excessAlloc.getContainers()).thenReturn(containers);
return excessAlloc;
}
}
private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId,
int memory, String[] hosts) {
return createReq(jobId, taskAttemptId, memory, hosts, false, false);