YARN-8233. NPE in CapacityScheduler#tryCommit when handling allocate/reserve proposal whose allocatedOrReservedContainer is null. Contributed by Tao Yang.

This commit is contained in:
Akira Ajisaka 2018-11-10 18:02:57 +09:00
parent ca331c8057
commit 50ce9fd4e2
No known key found for this signature in database
GPG Key ID: C1EDBB9CA400FD50
2 changed files with 143 additions and 23 deletions

View File

@ -2448,7 +2448,11 @@ public class CapacityScheduler extends
.getContainersToKill().isEmpty()) { .getContainersToKill().isEmpty()) {
list = new ArrayList<>(); list = new ArrayList<>();
for (RMContainer rmContainer : csAssignment.getContainersToKill()) { for (RMContainer rmContainer : csAssignment.getContainersToKill()) {
list.add(getSchedulerContainer(rmContainer, false)); SchedulerContainer schedulerContainer =
getSchedulerContainer(rmContainer, false);
if (schedulerContainer != null) {
list.add(schedulerContainer);
}
} }
} }
@ -2456,10 +2460,16 @@ public class CapacityScheduler extends
if (null == list) { if (null == list) {
list = new ArrayList<>(); list = new ArrayList<>();
} }
list.add( SchedulerContainer schedulerContainer =
getSchedulerContainer(csAssignment.getExcessReservation(), false)); getSchedulerContainer(csAssignment.getExcessReservation(), false);
if (schedulerContainer != null) {
list.add(schedulerContainer);
}
} }
if (list != null && list.isEmpty()) {
list = null;
}
return list; return list;
} }
@ -2499,35 +2509,51 @@ public class CapacityScheduler extends
csAssignment.getAssignmentInformation().getAllocationDetails(); csAssignment.getAssignmentInformation().getAllocationDetails();
if (!allocations.isEmpty()) { if (!allocations.isEmpty()) {
RMContainer rmContainer = allocations.get(0).rmContainer; RMContainer rmContainer = allocations.get(0).rmContainer;
allocated = new ContainerAllocationProposal<>( SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
getSchedulerContainer(rmContainer, true), schedulerContainer = getSchedulerContainer(rmContainer, true);
if (schedulerContainer == null) {
allocated = null;
// Decrease unconfirmed resource if app is alive
FiCaSchedulerApp app = getApplicationAttempt(
rmContainer.getApplicationAttemptId());
if (app != null) {
app.decUnconfirmedRes(rmContainer.getAllocatedResource());
}
} else {
allocated = new ContainerAllocationProposal<>(schedulerContainer,
getSchedulerContainersToRelease(csAssignment), getSchedulerContainersToRelease(csAssignment),
getSchedulerContainer(csAssignment.getFulfilledReservedContainer(), getSchedulerContainer(
false), csAssignment.getType(), csAssignment.getFulfilledReservedContainer(), false),
csAssignment.getRequestLocalityType(), csAssignment.getType(), csAssignment.getRequestLocalityType(),
csAssignment.getSchedulingMode() != null ? csAssignment.getSchedulingMode() != null ?
csAssignment.getSchedulingMode() : csAssignment.getSchedulingMode() :
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,
csAssignment.getResource()); csAssignment.getResource());
} }
}
// Reserved something // Reserved something
List<AssignmentInformation.AssignmentDetails> reservation = List<AssignmentInformation.AssignmentDetails> reservation =
csAssignment.getAssignmentInformation().getReservationDetails(); csAssignment.getAssignmentInformation().getReservationDetails();
if (!reservation.isEmpty()) { if (!reservation.isEmpty()) {
RMContainer rmContainer = reservation.get(0).rmContainer; RMContainer rmContainer = reservation.get(0).rmContainer;
reserved = new ContainerAllocationProposal<>( SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
getSchedulerContainer(rmContainer, false), schedulerContainer = getSchedulerContainer(rmContainer, false);
if (schedulerContainer == null) {
reserved = null;
} else {
reserved = new ContainerAllocationProposal<>(schedulerContainer,
getSchedulerContainersToRelease(csAssignment), getSchedulerContainersToRelease(csAssignment),
getSchedulerContainer(csAssignment.getFulfilledReservedContainer(), getSchedulerContainer(
false), csAssignment.getType(), csAssignment.getFulfilledReservedContainer(), false),
csAssignment.getRequestLocalityType(), csAssignment.getType(), csAssignment.getRequestLocalityType(),
csAssignment.getSchedulingMode() != null ? csAssignment.getSchedulingMode() != null ?
csAssignment.getSchedulingMode() : csAssignment.getSchedulingMode() :
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,
csAssignment.getResource()); csAssignment.getResource());
} }
} }
}
// When we don't need to allocate/reserve anything, we can feel free to // When we don't need to allocate/reserve anything, we can feel free to
// kill all to-release containers in the request. // kill all to-release containers in the request.

View File

@ -18,12 +18,14 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import com.google.common.collect.ImmutableList;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -41,6 +43,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@ -50,8 +55,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerA
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert; import org.junit.Assert;
@ -559,6 +567,92 @@ public class TestCapacitySchedulerAsyncScheduling {
rm.close(); rm.close();
} }
@Test(timeout = 30000)
public void testReturnNullWhenGetSchedulerContainer() throws Exception {
// disable async-scheduling for simulating complex scenario
Configuration disableAsyncConf = new Configuration(conf);
disableAsyncConf.setBoolean(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false);
// init RM & NMs
final MockRM rm = new MockRM(disableAsyncConf);
rm.start();
final MockNM nm1 = rm.registerNode("192.168.0.1:1234", 8 * GB);
final MockNM nm2 = rm.registerNode("192.168.0.2:2234", 8 * GB);
rm.drainEvents();
CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler();
SchedulerNode sn1 = cs.getSchedulerNode(nm1.getNodeId());
RMNode rmNode1 = cs.getNode(nm1.getNodeId()).getRMNode();
SchedulerNode sn2 = cs.getSchedulerNode(nm2.getNodeId());
// launch app1-am on nm1
RMApp app1 = rm.submitApp(1 * GB, "app1", "user", null, false, "default",
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
// app2 asks 1 * 1G container
am1.allocate(ImmutableList.of(ResourceRequest
.newInstance(Priority.newInstance(0), "*",
Resources.createResource(1 * GB), 1)), null);
RMContainer amContainer = cs.getRMContainer(
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1));
// spy CapacityScheduler
final CapacityScheduler spyCs = Mockito.spy(cs);
// hook CapacityScheduler#submitResourceCommitRequest
List<CSAssignment> assignmentSnapshots = new ArrayList<>();
Mockito.doAnswer(new Answer<Object>() {
public Boolean answer(InvocationOnMock invocation) throws Exception {
CSAssignment assignment = (CSAssignment) invocation.getArguments()[1];
if (cs.getNode(nm1.getNodeId()) != null) {
// decommission nm1 for first allocation on nm1
cs.getRMContext().getDispatcher().getEventHandler().handle(
new RMNodeEvent(nm1.getNodeId(), RMNodeEventType.DECOMMISSION));
rm.drainEvents();
Assert.assertEquals(NodeState.DECOMMISSIONED, rmNode1.getState());
Assert.assertNull(cs.getNode(nm1.getNodeId()));
assignmentSnapshots.add(assignment);
} else {
// add am container on nm1 to containersToKill
// for second allocation on nm2
assignment.setContainersToKill(ImmutableList.of(amContainer));
}
// check no NPE in actual submit, before YARN-8233 will throw NPE
cs.submitResourceCommitRequest((Resource) invocation.getArguments()[0],
assignment);
return false;
}
}).when(spyCs).submitResourceCommitRequest(Mockito.any(Resource.class),
Mockito.any(CSAssignment.class));
// allocation on nm1, test return null when get scheduler container
CandidateNodeSet<FiCaSchedulerNode> candidateNodeSet =
new SimpleCandidateNodeSet(sn1);
spyCs.allocateContainersToNode(candidateNodeSet, false);
// make sure unconfirmed resource is decreased correctly
Assert.assertTrue(spyCs.getApplicationAttempt(am1.getApplicationAttemptId())
.hasPendingResourceRequest(
rm.getResourceScheduler().getResourceCalculator(),
RMNodeLabelsManager.NO_LABEL,
rm.getResourceScheduler().getClusterResource(),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
// allocation on nm2,
// test return null when get scheduler container to release
candidateNodeSet =
new SimpleCandidateNodeSet(sn2);
spyCs.allocateContainersToNode(candidateNodeSet, false);
// make sure unconfirmed resource is decreased correctly
Assert.assertTrue(spyCs.getApplicationAttempt(am1.getApplicationAttemptId())
.hasPendingResourceRequest(
rm.getResourceScheduler().getResourceCalculator(),
RMNodeLabelsManager.NO_LABEL,
rm.getResourceScheduler().getClusterResource(),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
rm.stop();
}
public static class NMHeartbeatThread extends Thread { public static class NMHeartbeatThread extends Thread {
private List<MockNM> mockNMS; private List<MockNM> mockNMS;
private int interval; private int interval;