diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 67ce4377daa..02872a5b16d 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -258,6 +258,9 @@ Release 2.8.0 - UNRELEASED
YARN-3547. FairScheduler: Apps that have no resource demand should not participate
scheduling. (Xianyin Xin via kasha)
+ YARN-3259. FairScheduler: Trigger fairShare updates on node events.
+ (Anubhav Dhoot via kasha)
+
BUG FIXES
YARN-3197. Confusing log generated by CapacityScheduler. (Varun Saxena
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSOpDurations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSOpDurations.java
index c2282fdb736..20d2af91411 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSOpDurations.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSOpDurations.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.metrics2.MetricsCollector;
@@ -116,4 +117,9 @@ public void addUpdateCallDuration(long value) {
public void addPreemptCallDuration(long value) {
preemptCall.add(value);
}
+
+ @VisibleForTesting
+ public boolean hasUpdateThreadRunChanged() {
+ return updateThreadRun.changed();
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 07b32714a2a..64b3f12f67e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -103,9 +103,9 @@
* of the root queue in the typical fair scheduling fashion. Then, the children
* distribute the resources assigned to them to their children in the same
* fashion. Applications may only be scheduled on leaf queues. Queues can be
- * specified as children of other queues by placing them as sub-elements of their
- * parents in the fair scheduler configuration file.
- *
+ * specified as children of other queues by placing them as sub-elements of
+ * their parents in the fair scheduler configuration file.
+ *
* A queue's name starts with the names of its parents, with periods as
* separators. So a queue named "queue1" under the root named, would be
* referred to as "root.queue1", and a queue named "queue2" under a queue
@@ -142,6 +142,8 @@ public class FairScheduler extends
@VisibleForTesting
Thread updateThread;
+ private final Object updateThreadMonitor = new Object();
+
@VisibleForTesting
Thread schedulingThread;
// timeout to join when we stop this service
@@ -246,6 +248,13 @@ public QueueManager getQueueManager() {
return queueMgr;
}
+ // Allows UpdateThread to start processing without waiting till updateInterval
+ void triggerUpdate() {
+ synchronized (updateThreadMonitor) {
+ updateThreadMonitor.notify();
+ }
+ }
+
/**
* Thread which calls {@link FairScheduler#update()} every
* updateInterval
milliseconds.
@@ -256,7 +265,9 @@ private class UpdateThread extends Thread {
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
- Thread.sleep(updateInterval);
+ synchronized (updateThreadMonitor) {
+ updateThreadMonitor.wait(updateInterval);
+ }
long start = getClock().getTime();
update();
preemptTasksIfNecessary();
@@ -838,6 +849,8 @@ private synchronized void addNode(RMNode node) {
updateRootQueueMetrics();
updateMaximumAllocation(schedulerNode, true);
+ triggerUpdate();
+
queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
queueMgr.getRootQueue().recomputeSteadyShares();
LOG.info("Added node " + node.getNodeAddress() +
@@ -853,6 +866,8 @@ private synchronized void removeNode(RMNode rmNode) {
Resources.subtractFrom(clusterResource, rmNode.getTotalCapability());
updateRootQueueMetrics();
+ triggerUpdate();
+
// Remove running containers
List runningContainers = node.getRunningContainers();
for (RMContainer container : runningContainers) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingUpdate.java
new file mode 100644
index 00000000000..94298f42ea1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingUpdate.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricsRecord;
+import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestSchedulingUpdate extends FairSchedulerTestBase {
+
+ @Override
+ public Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+
+ // Make the update loop to never finish to ensure zero update calls
+ conf.setInt(
+ FairSchedulerConfiguration.UPDATE_INTERVAL_MS,
+ Integer.MAX_VALUE);
+ return conf;
+ }
+
+ @Before
+ public void setup() {
+ conf = createConfiguration();
+ resourceManager = new MockRM(conf);
+ resourceManager.start();
+
+ scheduler = (FairScheduler) resourceManager.getResourceScheduler();
+ }
+
+ @After
+ public void teardown() {
+ if (resourceManager != null) {
+ resourceManager.stop();
+ resourceManager = null;
+ }
+ }
+
+ @Test (timeout = 3000)
+ public void testSchedulingUpdateOnNodeJoinLeave() throws InterruptedException {
+
+ verifyNoCalls();
+
+ // Add one node
+ String host = "127.0.0.1";
+ final int memory = 4096;
+ final int cores = 4;
+ RMNode node1 = MockNodes.newNodeInfo(
+ 1, Resources.createResource(memory, cores), 1, host);
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ long expectedCalls = 1;
+ verifyExpectedCalls(expectedCalls, memory, cores);
+
+ // Remove the node
+ NodeRemovedSchedulerEvent nodeEvent2 = new NodeRemovedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent2);
+
+ expectedCalls = 2;
+ verifyExpectedCalls(expectedCalls, 0, 0);
+ }
+
+ private void verifyExpectedCalls(long expectedCalls, int memory, int vcores)
+ throws InterruptedException {
+ boolean verified = false;
+ int count = 0;
+ while (count < 100) {
+ if (scheduler.fsOpDurations.hasUpdateThreadRunChanged()) {
+ break;
+ }
+ count++;
+ Thread.sleep(10);
+ }
+ assertTrue("Update Thread has not run based on its metrics",
+ scheduler.fsOpDurations.hasUpdateThreadRunChanged());
+ assertEquals("Root queue metrics memory does not have expected value",
+ memory, scheduler.getRootQueueMetrics().getAvailableMB());
+ assertEquals("Root queue metrics cpu does not have expected value",
+ vcores, scheduler.getRootQueueMetrics().getAvailableVirtualCores());
+
+ MetricsCollectorImpl collector = new MetricsCollectorImpl();
+ scheduler.fsOpDurations.getMetrics(collector, true);
+ MetricsRecord record = collector.getRecords().get(0);
+ for (AbstractMetric abstractMetric : record.metrics()) {
+ if (abstractMetric.name().contains("UpdateThreadRunNumOps")) {
+ assertEquals("Update Thread did not run expected number of times " +
+ "based on metric record count",
+ expectedCalls,
+ abstractMetric.value());
+ verified = true;
+ }
+ }
+ assertTrue("Did not find metric for UpdateThreadRunNumOps", verified);
+ }
+
+ private void verifyNoCalls() {
+ assertFalse("Update thread should not have executed",
+ scheduler.fsOpDurations.hasUpdateThreadRunChanged());
+ assertEquals("Scheduler queue memory should not have been updated",
+ 0, scheduler.getRootQueueMetrics().getAvailableMB());
+ assertEquals("Scheduler queue cpu should not have been updated",
+ 0,scheduler.getRootQueueMetrics().getAvailableVirtualCores());
+ }
+}