diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index db17b42d021..af4a0012826 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -286,7 +287,8 @@ public abstract class SchedulerNode { * container. * @param resource Resources to deduct. */ - private synchronized void deductUnallocatedResource(Resource resource) { + @VisibleForTesting + public synchronized void deductUnallocatedResource(Resource resource) { if (resource == null) { LOG.error("Invalid deduction of null resource for " + rmNode.getNodeAddress()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index f3fde76f06a..98c14ace6a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -913,8 +913,12 @@ public class FairScheduler extends void continuousSchedulingAttempt() throws InterruptedException { long start = getClock().getTime(); - List nodeIdList = - nodeTracker.sortedNodeList(nodeAvailableResourceComparator); + List nodeIdList; + // Hold a lock to prevent comparator order changes due to changes of node + // unallocated resources + synchronized (this) { + nodeIdList = nodeTracker.sortedNodeList(nodeAvailableResourceComparator); + } // iterate all nodes for (FSSchedulerNode node : nodeIdList) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java index 1ea0032dc3d..9efa83d99f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java @@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -30,6 +31,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterNodeTracker; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; @@ -57,6 +59,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; public class TestContinuousScheduling extends FairSchedulerTestBase { private ControlledClock mockClock; @@ -302,6 +305,39 @@ public class TestContinuousScheduling extends FairSchedulerTestBase { assertNotEquals("One of the threads is still alive", 0, numRetries); } + @Test + public void TestNodeAvailableResourceComparatorTransitivity() { + ClusterNodeTracker clusterNodeTracker = + scheduler.getNodeTracker(); + + List rmNodes = + MockNodes.newNodes(2, 4000, Resource.newInstance(4096, 4)); + for (RMNode rmNode : rmNodes) { + clusterNodeTracker.addNode(new FSSchedulerNode(rmNode, false)); + } + + // To simulate unallocated resource changes + new Thread() { + @Override + public void run() { + for (int j = 0; j < 100; j++) { + for (FSSchedulerNode node : clusterNodeTracker.getAllNodes()) { + int i = ThreadLocalRandom.current().nextInt(-30, 30); + synchronized (scheduler) { + node.deductUnallocatedResource(Resource.newInstance(i * 1024, i)); + } + } + } + } + }.start(); + + try { + scheduler.continuousSchedulingAttempt(); + } catch (Exception e) { + fail(e.getMessage()); + } + } + @Test public void testFairSchedulerContinuousSchedulingInitTime() throws Exception { scheduler.start();