From 2b2e2ac5f421629f1d3ead31d048b6987b274a4d Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Wed, 11 Apr 2018 20:21:05 +0800 Subject: [PATCH] YARN-6629. NPE occurred when container allocation proposal is applied but its resource requests are removed before. Contributed by Tao Yang. --- .../scheduler/capacity/CapacityScheduler.java | 3 +- .../common/fica/FiCaSchedulerApp.java | 12 ++++- .../capacity/TestCapacityScheduler.java | 52 +++++++++++++++++++ 3 files changed, 63 insertions(+), 4 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index ab60d8a3bd1..aaae86d9966 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -2516,8 +2516,7 @@ public class CapacityScheduler extends // proposal might be outdated if AM failover just finished // and proposal queue was not be consumed in time if (app != null && attemptId.equals(app.getApplicationAttemptId())) { - if (app.accept(cluster, request)) { - app.apply(cluster, request); + if (app.accept(cluster, request) && app.apply(cluster, request)) { LOG.info("Allocation proposal accepted"); } else{ LOG.info("Failed to accept allocation proposal"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 38b721380c9..726f7e2ebdd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -484,7 +484,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { return accepted; } - public void apply(Resource cluster, + public boolean apply(Resource cluster, ResourceCommitRequest request) { boolean reReservation = false; @@ -497,8 +497,15 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { allocation = request.getFirstAllocatedOrReservedContainer(); SchedulerContainer schedulerContainer = allocation.getAllocatedOrReservedContainer(); - RMContainer rmContainer = schedulerContainer.getRmContainer(); + // Required sanity check - AM can call 'allocate' to update resource + // request without locking the scheduler, hence we need to check + if (getOutstandingAsksCount(schedulerContainer.getSchedulerRequestKey()) + <= 0) { + return false; + } + + RMContainer rmContainer = schedulerContainer.getRmContainer(); reReservation = (!schedulerContainer.isAllocated()) && (rmContainer.getState() == RMContainerState.RESERVED); @@ -578,6 +585,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { if (!reReservation) { getCSLeafQueue().apply(cluster, request); } + return true; } public boolean unreserve(SchedulerRequestKey schedulerKey, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 89e622992fe..396a2aa0265 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -111,6 +111,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -144,6 +145,8 @@ import org.junit.Assume; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.io.IOException; import java.net.InetSocketAddress; @@ -5010,4 +5013,53 @@ public class TestCapacityScheduler { } } } + + @Test (timeout = 30000) + public void testClearRequestsBeforeApplyTheProposal() + throws Exception { + // init RM & NMs & Nodes + final MockRM rm = new MockRM(new CapacitySchedulerConfiguration()); + rm.start(); + final MockNM nm = rm.registerNode("h1:1234", 200 * GB); + + // submit app + final RMApp app = rm.submitApp(200, "app", "user"); + MockRM.launchAndRegisterAM(app, rm, nm); + + // spy capacity scheduler to handle CapacityScheduler#apply + final Priority priority = Priority.newInstance(1); + final CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + final CapacityScheduler spyCs = Mockito.spy(cs); + Mockito.doAnswer(new Answer() { + public Object answer(InvocationOnMock invocation) throws Exception { + // clear resource request before applying the proposal for container_2 + spyCs.allocate(app.getCurrentAppAttempt().getAppAttemptId(), + Arrays.asList(ResourceRequest.newInstance(priority, "*", + Resources.createResource(1 * GB), 0)), + Collections.emptyList(), null, null, + NULL_UPDATE_REQUESTS); + // trigger real apply which can raise NPE before YARN-6629 + try { + FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt( + app.getCurrentAppAttempt().getAppAttemptId()); + schedulerApp.apply((Resource) invocation.getArguments()[0], + (ResourceCommitRequest) invocation.getArguments()[1]); + // the proposal of removed request should be rejected + Assert.assertEquals(1, schedulerApp.getLiveContainers().size()); + } catch (Throwable e) { + Assert.fail(); + } + return null; + } + }).when(spyCs).tryCommit(Mockito.any(Resource.class), + Mockito.any(ResourceCommitRequest.class)); + + // rm allocates container_2 to reproduce the process that can raise NPE + spyCs.allocate(app.getCurrentAppAttempt().getAppAttemptId(), + Arrays.asList(ResourceRequest.newInstance(priority, "*", + Resources.createResource(1 * GB), 1)), + Collections.emptyList(), null, null, NULL_UPDATE_REQUESTS); + spyCs.handle(new NodeUpdateSchedulerEvent( + spyCs.getNode(nm.getNodeId()).getRMNode())); + } }