From b4c4f365948d36b36942f912ef994c1c21ba59e3 Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Wed, 5 Apr 2017 15:42:55 -0700 Subject: [PATCH] YARN-6448. Continuous scheduling thread crashes while sorting nodes. (Yufei Gu via kasha) --- .../scheduler/SchedulerNode.java | 4 ++- .../scheduler/fair/FairScheduler.java | 8 +++-- .../fair/TestContinuousScheduling.java | 36 +++++++++++++++++++ 3 files changed, 45 insertions(+), 3 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index db17b42d021..af4a0012826 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Set; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -286,7 +287,8 @@ private synchronized void addUnallocatedResource(Resource resource) { * container. * @param resource Resources to deduct. */ - private synchronized void deductUnallocatedResource(Resource resource) { + @VisibleForTesting + public synchronized void deductUnallocatedResource(Resource resource) { if (resource == null) { LOG.error("Invalid deduction of null resource for " + rmNode.getNodeAddress()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index f3fde76f06a..98c14ace6a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -913,8 +913,12 @@ protected void nodeUpdate(RMNode nm) { void continuousSchedulingAttempt() throws InterruptedException { long start = getClock().getTime(); - List nodeIdList = - nodeTracker.sortedNodeList(nodeAvailableResourceComparator); + List nodeIdList; + // Hold a lock to prevent comparator order changes due to changes of node + // unallocated resources + synchronized (this) { + nodeIdList = nodeTracker.sortedNodeList(nodeAvailableResourceComparator); + } // iterate all nodes for (FSSchedulerNode node : nodeIdList) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java index 1ea0032dc3d..9efa83d99f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java @@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -30,6 +31,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterNodeTracker; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; @@ -57,6 +59,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; public class TestContinuousScheduling extends FairSchedulerTestBase { private ControlledClock mockClock; @@ -302,6 +305,39 @@ public void testThreadLifeCycle() throws InterruptedException { assertNotEquals("One of the threads is still alive", 0, numRetries); } + @Test + public void TestNodeAvailableResourceComparatorTransitivity() { + ClusterNodeTracker clusterNodeTracker = + scheduler.getNodeTracker(); + + List rmNodes = + MockNodes.newNodes(2, 4000, Resource.newInstance(4096, 4)); + for (RMNode rmNode : rmNodes) { + clusterNodeTracker.addNode(new FSSchedulerNode(rmNode, false)); + } + + // To simulate unallocated resource changes + new Thread() { + @Override + public void run() { + for (int j = 0; j < 100; j++) { + for (FSSchedulerNode node : clusterNodeTracker.getAllNodes()) { + int i = ThreadLocalRandom.current().nextInt(-30, 30); + synchronized (scheduler) { + node.deductUnallocatedResource(Resource.newInstance(i * 1024, i)); + } + } + } + } + }.start(); + + try { + scheduler.continuousSchedulingAttempt(); + } catch (Exception e) { + fail(e.getMessage()); + } + } + @Test public void testFairSchedulerContinuousSchedulingInitTime() throws Exception { scheduler.start();