YARN-6629. NPE occurred when container allocation proposal is applied but its resource requests are removed before. (Tao Yang via wangda)
Change-Id: I805880f90b3f6798ec96ed8e8e75755f390a9ad5
(cherry picked from commit 47f711eebc
)
This commit is contained in:
parent
99766992e6
commit
a2b4daab55
|
@ -2789,8 +2789,8 @@ public class CapacityScheduler extends
|
|||
// proposal might be outdated if AM failover just finished
|
||||
// and proposal queue was not be consumed in time
|
||||
if (app != null && attemptId.equals(app.getApplicationAttemptId())) {
|
||||
if (app.accept(cluster, request, updatePending)) {
|
||||
app.apply(cluster, request, updatePending);
|
||||
if (app.accept(cluster, request, updatePending)
|
||||
&& app.apply(cluster, request, updatePending)) {
|
||||
LOG.info("Allocation proposal accepted");
|
||||
isSuccess = true;
|
||||
} else{
|
||||
|
|
|
@ -489,7 +489,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
|
|||
return accepted;
|
||||
}
|
||||
|
||||
public void apply(Resource cluster, ResourceCommitRequest<FiCaSchedulerApp,
|
||||
public boolean apply(Resource cluster, ResourceCommitRequest<FiCaSchedulerApp,
|
||||
FiCaSchedulerNode> request, boolean updatePending) {
|
||||
boolean reReservation = false;
|
||||
|
||||
|
@ -502,8 +502,16 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
|
|||
allocation = request.getFirstAllocatedOrReservedContainer();
|
||||
SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
|
||||
schedulerContainer = allocation.getAllocatedOrReservedContainer();
|
||||
RMContainer rmContainer = schedulerContainer.getRmContainer();
|
||||
|
||||
// Required sanity check - AM can call 'allocate' to update resource
|
||||
// request without locking the scheduler, hence we need to check
|
||||
if (updatePending &&
|
||||
getOutstandingAsksCount(schedulerContainer.getSchedulerRequestKey())
|
||||
<= 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
RMContainer rmContainer = schedulerContainer.getRmContainer();
|
||||
reReservation =
|
||||
(!schedulerContainer.isAllocated()) && (rmContainer.getState()
|
||||
== RMContainerState.RESERVED);
|
||||
|
@ -545,7 +553,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
|
|||
containerRequest);
|
||||
|
||||
// If this is from a SchedulingRequest, set allocation tags.
|
||||
if (containerRequest.getSchedulingRequest() != null) {
|
||||
if (containerRequest != null
|
||||
&& containerRequest.getSchedulingRequest() != null) {
|
||||
((RMContainerImpl) rmContainer).setAllocationTags(
|
||||
containerRequest.getSchedulingRequest().getAllocationTags());
|
||||
}
|
||||
|
@ -598,6 +607,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
|
|||
if (!reReservation) {
|
||||
getCSLeafQueue().apply(cluster, request);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean unreserve(SchedulerRequestKey schedulerKey,
|
||||
|
|
|
@ -134,6 +134,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||
|
@ -170,6 +171,8 @@ import com.google.common.base.Supplier;
|
|||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
public class TestCapacityScheduler extends CapacitySchedulerTestBase {
|
||||
private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class);
|
||||
|
@ -4857,4 +4860,54 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test (timeout = 60000)
|
||||
public void testClearRequestsBeforeApplyTheProposal()
|
||||
throws Exception {
|
||||
// init RM & NMs & Nodes
|
||||
final MockRM rm = new MockRM(new CapacitySchedulerConfiguration());
|
||||
rm.start();
|
||||
final MockNM nm = rm.registerNode("h1:1234", 200 * GB);
|
||||
|
||||
// submit app
|
||||
final RMApp app = rm.submitApp(200, "app", "user");
|
||||
MockRM.launchAndRegisterAM(app, rm, nm);
|
||||
|
||||
// spy capacity scheduler to handle CapacityScheduler#apply
|
||||
final Priority priority = Priority.newInstance(1);
|
||||
final CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
final CapacityScheduler spyCs = Mockito.spy(cs);
|
||||
Mockito.doAnswer(new Answer<Object>() {
|
||||
public Object answer(InvocationOnMock invocation) throws Exception {
|
||||
// clear resource request before applying the proposal for container_2
|
||||
spyCs.allocate(app.getCurrentAppAttempt().getAppAttemptId(),
|
||||
Arrays.asList(ResourceRequest.newInstance(priority, "*",
|
||||
Resources.createResource(1 * GB), 0)), null,
|
||||
Collections.<ContainerId>emptyList(), null, null,
|
||||
NULL_UPDATE_REQUESTS);
|
||||
// trigger real apply which can raise NPE before YARN-6629
|
||||
try {
|
||||
FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(
|
||||
app.getCurrentAppAttempt().getAppAttemptId());
|
||||
schedulerApp.apply((Resource) invocation.getArguments()[0],
|
||||
(ResourceCommitRequest) invocation.getArguments()[1],
|
||||
(Boolean) invocation.getArguments()[2]);
|
||||
// the proposal of removed request should be rejected
|
||||
Assert.assertEquals(1, schedulerApp.getLiveContainers().size());
|
||||
} catch (Throwable e) {
|
||||
Assert.fail();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}).when(spyCs).tryCommit(Mockito.any(Resource.class),
|
||||
Mockito.any(ResourceCommitRequest.class), Mockito.anyBoolean());
|
||||
|
||||
// rm allocates container_2 to reproduce the process that can raise NPE
|
||||
spyCs.allocate(app.getCurrentAppAttempt().getAppAttemptId(),
|
||||
Arrays.asList(ResourceRequest.newInstance(priority, "*",
|
||||
Resources.createResource(1 * GB), 1)), null,
|
||||
Collections.<ContainerId>emptyList(), null, null, NULL_UPDATE_REQUESTS);
|
||||
spyCs.handle(new NodeUpdateSchedulerEvent(
|
||||
spyCs.getNode(nm.getNodeId()).getRMNode()));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue