YARN-6629. NPE occurred when container allocation proposal is applied but its resource requests are removed before. (Tao Yang via wangda)

Change-Id: I805880f90b3f6798ec96ed8e8e75755f390a9ad5
This commit is contained in:
Wangda Tan 2018-03-28 08:47:31 -07:00
parent cdee0a4f84
commit 47f711eebc
3 changed files with 68 additions and 5 deletions

View File

@ -2825,8 +2825,8 @@ public class CapacityScheduler extends
// proposal might be outdated if AM failover just finished
// and proposal queue was not be consumed in time
if (app != null && attemptId.equals(app.getApplicationAttemptId())) {
if (app.accept(cluster, request, updatePending)) {
app.apply(cluster, request, updatePending);
if (app.accept(cluster, request, updatePending)
&& app.apply(cluster, request, updatePending)) {
LOG.info("Allocation proposal accepted");
isSuccess = true;
} else{

View File

@ -489,7 +489,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
return accepted;
}
public void apply(Resource cluster, ResourceCommitRequest<FiCaSchedulerApp,
public boolean apply(Resource cluster, ResourceCommitRequest<FiCaSchedulerApp,
FiCaSchedulerNode> request, boolean updatePending) {
boolean reReservation = false;
@ -502,8 +502,16 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
allocation = request.getFirstAllocatedOrReservedContainer();
SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
schedulerContainer = allocation.getAllocatedOrReservedContainer();
RMContainer rmContainer = schedulerContainer.getRmContainer();
// Required sanity check - AM can call 'allocate' to update resource
// request without locking the scheduler, hence we need to check
if (updatePending &&
getOutstandingAsksCount(schedulerContainer.getSchedulerRequestKey())
<= 0) {
return false;
}
RMContainer rmContainer = schedulerContainer.getRmContainer();
reReservation =
(!schedulerContainer.isAllocated()) && (rmContainer.getState()
== RMContainerState.RESERVED);
@ -545,7 +553,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
containerRequest);
// If this is from a SchedulingRequest, set allocation tags.
if (containerRequest.getSchedulingRequest() != null) {
if (containerRequest != null
&& containerRequest.getSchedulingRequest() != null) {
((RMContainerImpl) rmContainer).setAllocationTags(
containerRequest.getSchedulingRequest().getAllocationTags());
}
@ -598,6 +607,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
if (!reReservation) {
getCSLeafQueue().apply(cluster, request);
}
return true;
}
public boolean unreserve(SchedulerRequestKey schedulerKey,

View File

@ -134,6 +134,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@ -170,6 +171,8 @@ import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestCapacityScheduler extends CapacitySchedulerTestBase {
private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class);
@ -4857,4 +4860,54 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
}
}
}
@Test (timeout = 60000)
public void testClearRequestsBeforeApplyTheProposal()
throws Exception {
// init RM & NMs & Nodes
final MockRM rm = new MockRM(new CapacitySchedulerConfiguration());
rm.start();
final MockNM nm = rm.registerNode("h1:1234", 200 * GB);
// submit app
final RMApp app = rm.submitApp(200, "app", "user");
MockRM.launchAndRegisterAM(app, rm, nm);
// spy capacity scheduler to handle CapacityScheduler#apply
final Priority priority = Priority.newInstance(1);
final CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
final CapacityScheduler spyCs = Mockito.spy(cs);
Mockito.doAnswer(new Answer<Object>() {
public Object answer(InvocationOnMock invocation) throws Exception {
// clear resource request before applying the proposal for container_2
spyCs.allocate(app.getCurrentAppAttempt().getAppAttemptId(),
Arrays.asList(ResourceRequest.newInstance(priority, "*",
Resources.createResource(1 * GB), 0)), null,
Collections.<ContainerId>emptyList(), null, null,
NULL_UPDATE_REQUESTS);
// trigger real apply which can raise NPE before YARN-6629
try {
FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(
app.getCurrentAppAttempt().getAppAttemptId());
schedulerApp.apply((Resource) invocation.getArguments()[0],
(ResourceCommitRequest) invocation.getArguments()[1],
(Boolean) invocation.getArguments()[2]);
// the proposal of removed request should be rejected
Assert.assertEquals(1, schedulerApp.getLiveContainers().size());
} catch (Throwable e) {
Assert.fail();
}
return null;
}
}).when(spyCs).tryCommit(Mockito.any(Resource.class),
Mockito.any(ResourceCommitRequest.class), Mockito.anyBoolean());
// rm allocates container_2 to reproduce the process that can raise NPE
spyCs.allocate(app.getCurrentAppAttempt().getAppAttemptId(),
Arrays.asList(ResourceRequest.newInstance(priority, "*",
Resources.createResource(1 * GB), 1)), null,
Collections.<ContainerId>emptyList(), null, null, NULL_UPDATE_REQUESTS);
spyCs.handle(new NodeUpdateSchedulerEvent(
spyCs.getNode(nm.getNodeId()).getRMNode()));
}
}