diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 6bbcdcb1e11..3fb4a3765d1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -4038,7 +4038,7 @@ public static boolean areNodeLabelsEnabled(
RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_MAX_QUEUE_LENGTH =
RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_PREFIX + "max-queue-length";
public static final int
- DEFAULT_RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_MAX_QUEUE_LENGTH = 1000;
+ DEFAULT_RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_MAX_QUEUE_LENGTH = 100;
public YarnConfiguration() {
super();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 9741f6c36b1..7e52dd151cd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -4209,6 +4209,6 @@
Max queue length for app activities.
yarn.resourcemanager.activities-manager.app-activities.max-queue-length
- 1000
+ 100
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
index 4149ac1565d..05ac01b3789 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
@@ -22,6 +22,7 @@
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
@@ -79,7 +80,8 @@ public class ActivitiesManager extends AbstractService {
private long activitiesCleanupIntervalMs;
private long schedulerActivitiesTTL;
private long appActivitiesTTL;
- private int appActivitiesMaxQueueLength;
+ private volatile int appActivitiesMaxQueueLength;
+ private int configuredAppActivitiesMaxQueueLength;
private final RMContext rmContext;
private volatile boolean stopped;
private ThreadLocal diagnosticCollectorManager;
@@ -114,10 +116,11 @@ private void setupConfForCleanup(Configuration conf) {
YarnConfiguration.RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_TTL_MS,
YarnConfiguration.
DEFAULT_RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_TTL_MS);
- appActivitiesMaxQueueLength = conf.getInt(YarnConfiguration.
+ configuredAppActivitiesMaxQueueLength = conf.getInt(YarnConfiguration.
RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_MAX_QUEUE_LENGTH,
YarnConfiguration.
DEFAULT_RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_MAX_QUEUE_LENGTH);
+ appActivitiesMaxQueueLength = configuredAppActivitiesMaxQueueLength;
}
public AppActivitiesInfo getAppActivitiesInfo(ApplicationId applicationId,
@@ -228,6 +231,44 @@ public void turnOnAppActivitiesRecording(ApplicationId applicationId,
recordingAppActivitiesUntilSpecifiedTime.put(applicationId, endTS);
}
+ private void dynamicallyUpdateAppActivitiesMaxQueueLengthIfNeeded() {
+ if (rmContext.getRMNodes() == null) {
+ return;
+ }
+ if (rmContext.getScheduler() instanceof CapacityScheduler) {
+ CapacityScheduler cs = (CapacityScheduler) rmContext.getScheduler();
+ if (!cs.isMultiNodePlacementEnabled()) {
+ int numNodes = rmContext.getRMNodes().size();
+ int newAppActivitiesMaxQueueLength;
+ int numAsyncSchedulerThreads = cs.getNumAsyncSchedulerThreads();
+ if (numAsyncSchedulerThreads > 0) {
+ newAppActivitiesMaxQueueLength =
+ Math.max(configuredAppActivitiesMaxQueueLength,
+ numNodes * numAsyncSchedulerThreads);
+ } else {
+ newAppActivitiesMaxQueueLength =
+ Math.max(configuredAppActivitiesMaxQueueLength,
+ (int) (numNodes * 1.2));
+ }
+ if (appActivitiesMaxQueueLength != newAppActivitiesMaxQueueLength) {
+ LOG.info("Update max queue length of app activities from {} to {},"
+ + " configured={}, numNodes={}, numAsyncSchedulerThreads={}"
+ + " when multi-node placement disabled.",
+ appActivitiesMaxQueueLength, newAppActivitiesMaxQueueLength,
+ configuredAppActivitiesMaxQueueLength, numNodes,
+ numAsyncSchedulerThreads);
+ appActivitiesMaxQueueLength = newAppActivitiesMaxQueueLength;
+ }
+ } else if (appActivitiesMaxQueueLength
+ != configuredAppActivitiesMaxQueueLength) {
+ LOG.info("Update max queue length of app activities from {} to {}"
+ + " when multi-node placement enabled.",
+ appActivitiesMaxQueueLength, configuredAppActivitiesMaxQueueLength);
+ appActivitiesMaxQueueLength = configuredAppActivitiesMaxQueueLength;
+ }
+ }
+ }
+
@Override
protected void serviceStart() throws Exception {
cleanUpThread = new Thread(new Runnable() {
@@ -277,6 +318,8 @@ public void run() {
LOG.debug("Remaining apps in app activities cache: {}",
completedAppAllocations.keySet());
+ // dynamically update max queue length of app activities if needed
+ dynamicallyUpdateAppActivitiesMaxQueueLengthIfNeeded();
try {
Thread.sleep(activitiesCleanupIntervalMs);
} catch (InterruptedException e) {
@@ -567,4 +610,9 @@ private static String getDiagnostics(DiagnosticsCollector dc) {
}
return sb.toString();
}
+
+ @VisibleForTesting
+ public int getAppActivitiesMaxQueueLength() {
+ return appActivitiesMaxQueueLength;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index e59abee6b51..d6f20d7fa29 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -3185,4 +3185,12 @@ private LeafQueue autoCreateLeafQueue(
public void resetSchedulerMetrics() {
CapacitySchedulerMetrics.destroy();
}
+
+ public boolean isMultiNodePlacementEnabled() {
+ return multiNodePlacementEnabled;
+ }
+
+ public int getNumAsyncSchedulerThreads() {
+ return asyncSchedulerThreads == null ? 0 : asyncSchedulerThreads.size();
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java
index 2bf6b23ed70..35b88726347 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java
@@ -25,13 +25,16 @@
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -40,10 +43,12 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -393,6 +398,64 @@ public void testAppActivitiesPerformance() {
testingTimes);
}
+ @Test (timeout = 10000)
+ public void testAppActivitiesMaxQueueLengthUpdate()
+ throws TimeoutException, InterruptedException {
+ Configuration conf = new Configuration();
+ int configuredAppActivitiesMaxQueueLength = 1;
+ conf.setInt(YarnConfiguration.
+ RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_MAX_QUEUE_LENGTH,
+ configuredAppActivitiesMaxQueueLength);
+ conf.setInt(YarnConfiguration.RM_ACTIVITIES_MANAGER_CLEANUP_INTERVAL_MS,
+ 500);
+ ConcurrentMap mockNodes = new ConcurrentHashMap<>();
+ int numNodes = 5;
+ for (int i = 0; i < numNodes; i++) {
+ mockNodes.put(NodeId.newInstance("node" + i, 0), mock(RMNode.class));
+ }
+ CapacityScheduler cs = Mockito.mock(CapacityScheduler.class);
+ RMContext mockRMContext = Mockito.mock(RMContext.class);
+ Mockito.when(mockRMContext.getRMNodes()).thenReturn(mockNodes);
+ Mockito.when(mockRMContext.getYarnConfiguration()).thenReturn(conf);
+ Mockito.when(mockRMContext.getScheduler()).thenReturn(cs);
+ /*
+ * Test for async-scheduling with multi-node placement disabled
+ */
+ Mockito.when(cs.isMultiNodePlacementEnabled()).thenReturn(false);
+ int numAsyncSchedulerThreads = 3;
+ Mockito.when(cs.getNumAsyncSchedulerThreads())
+ .thenReturn(numAsyncSchedulerThreads);
+ ActivitiesManager newActivitiesManager =
+ new ActivitiesManager(mockRMContext);
+ Assert.assertEquals(1,
+ newActivitiesManager.getAppActivitiesMaxQueueLength());
+ newActivitiesManager.init(conf);
+ newActivitiesManager.start();
+ GenericTestUtils.waitFor(
+ () -> newActivitiesManager.getAppActivitiesMaxQueueLength()
+ == numNodes * numAsyncSchedulerThreads, 100, 3000);
+ Assert.assertEquals(15,
+ newActivitiesManager.getAppActivitiesMaxQueueLength());
+ /*
+ * Test for HB-driven scheduling with multi-node placement disabled
+ */
+ Mockito.when(cs.getNumAsyncSchedulerThreads()).thenReturn(0);
+ GenericTestUtils.waitFor(
+ () -> newActivitiesManager.getAppActivitiesMaxQueueLength()
+ == numNodes * 1.2, 100, 3000);
+ Assert.assertEquals(6,
+ newActivitiesManager.getAppActivitiesMaxQueueLength());
+ /*
+ * Test for scheduling with multi-node placement enabled
+ */
+ Mockito.when(cs.isMultiNodePlacementEnabled()).thenReturn(true);
+ GenericTestUtils.waitFor(
+ () -> newActivitiesManager.getAppActivitiesMaxQueueLength()
+ == configuredAppActivitiesMaxQueueLength, 100, 3000);
+ Assert.assertEquals(1,
+ newActivitiesManager.getAppActivitiesMaxQueueLength());
+ }
+
private void testManyTimes(String testingName,
Supplier supplier, int testingTimes) {
long totalTime = 0;