diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 273f1a9529a..b4ed2b00dae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -4005,6 +4005,41 @@ public class YarnConfiguration extends Configuration {
public static final String DEFAULT_NM_NUMA_AWARENESS_NUMACTL_CMD =
"/usr/bin/numactl";
+ /**
+ * Settings for activities manager.
+ */
+ public static final String RM_ACTIVITIES_MANAGER_PREFIX =
+ RM_PREFIX + "activities-manager.";
+ public static final String RM_ACTIVITIES_MANAGER_SCHEDULER_ACTIVITIES_PREFIX =
+ RM_ACTIVITIES_MANAGER_PREFIX + "scheduler-activities.";
+ public static final String RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_PREFIX =
+ RM_ACTIVITIES_MANAGER_PREFIX + "app-activities.";
+
+ /** The cleanup interval for activities in milliseconds. **/
+ public static final String RM_ACTIVITIES_MANAGER_CLEANUP_INTERVAL_MS =
+ RM_ACTIVITIES_MANAGER_PREFIX + "cleanup-interval-ms";
+ public static final long DEFAULT_RM_ACTIVITIES_MANAGER_CLEANUP_INTERVAL_MS =
+ 5000L;
+
+ /** Time to live for scheduler activities in milliseconds. **/
+ public static final String RM_ACTIVITIES_MANAGER_SCHEDULER_ACTIVITIES_TTL_MS =
+ RM_ACTIVITIES_MANAGER_SCHEDULER_ACTIVITIES_PREFIX + "ttl-ms";
+ public static final long
+ DEFAULT_RM_ACTIVITIES_MANAGER_SCHEDULER_ACTIVITIES_TTL_MS = 600000L;
+
+ /** Time to live for app activities in milliseconds. **/
+ public static final String RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_TTL_MS =
+ RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_PREFIX + "ttl-ms";
+ public static final long DEFAULT_RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_TTL_MS =
+ 600000L;
+
+ /** Max queue length for app activities. **/
+ public static final String
+ RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_MAX_QUEUE_LENGTH =
+ RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_PREFIX + "max-queue-length";
+ public static final int
+ DEFAULT_RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_MAX_QUEUE_LENGTH = 1000;
+
public YarnConfiguration() {
super();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index c2bcdb6842f..9741f6c36b1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -4187,4 +4187,28 @@
yarn.nodemanager.csi-driver.names
+
+
+ The cleanup interval for activities in milliseconds.
+ yarn.resourcemanager.activities-manager.cleanup-interval-ms
+ 5000
+
+
+
+ Time to live for scheduler activities in milliseconds.
+ yarn.resourcemanager.activities-manager.scheduler-activities.ttl-ms
+ 600000
+
+
+
+ Time to live for app activities in milliseconds.
+ yarn.resourcemanager.activities-manager.app-activities.ttl-ms
+ 600000
+
+
+
+ Max queue length for app activities.
+ yarn.resourcemanager.activities-manager.app-activities.max-queue-length
+ 1000
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
index d6c21ced121..7d1dd6954e1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
@@ -19,9 +19,11 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.service.AbstractService;
@@ -72,7 +74,10 @@ public class ActivitiesManager extends AbstractService {
private boolean recordNextAvailableNode = false;
private List lastAvailableNodeActivities = null;
private Thread cleanUpThread;
- private int timeThreshold = 600 * 1000;
+ private long activitiesCleanupIntervalMs;
+ private long schedulerActivitiesTTL;
+ private long appActivitiesTTL;
+ private int appActivitiesMaxQueueLength;
private final RMContext rmContext;
private volatile boolean stopped;
private ThreadLocal diagnosticCollectorManager;
@@ -89,6 +94,28 @@ public class ActivitiesManager extends AbstractService {
() -> new DiagnosticsCollectorManager(
new GenericDiagnosticsCollector()));
this.rmContext = rmContext;
+ if (rmContext.getYarnConfiguration() != null) {
+ setupConfForCleanup(rmContext.getYarnConfiguration());
+ }
+ }
+
+ private void setupConfForCleanup(Configuration conf) {
+ activitiesCleanupIntervalMs = conf.getLong(
+ YarnConfiguration.RM_ACTIVITIES_MANAGER_CLEANUP_INTERVAL_MS,
+ YarnConfiguration.
+ DEFAULT_RM_ACTIVITIES_MANAGER_CLEANUP_INTERVAL_MS);
+ schedulerActivitiesTTL = conf.getLong(
+ YarnConfiguration.RM_ACTIVITIES_MANAGER_SCHEDULER_ACTIVITIES_TTL_MS,
+ YarnConfiguration.
+ DEFAULT_RM_ACTIVITIES_MANAGER_SCHEDULER_ACTIVITIES_TTL_MS);
+ appActivitiesTTL = conf.getLong(
+ YarnConfiguration.RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_TTL_MS,
+ YarnConfiguration.
+ DEFAULT_RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_TTL_MS);
+ appActivitiesMaxQueueLength = conf.getInt(YarnConfiguration.
+ RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_MAX_QUEUE_LENGTH,
+ YarnConfiguration.
+ DEFAULT_RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_MAX_QUEUE_LENGTH);
}
public AppActivitiesInfo getAppActivitiesInfo(ApplicationId applicationId,
@@ -152,12 +179,13 @@ public class ActivitiesManager extends AbstractService {
while (!stopped && !Thread.currentThread().isInterrupted()) {
Iterator>> ite =
completedNodeAllocations.entrySet().iterator();
+ long curTS = SystemClock.getInstance().getTime();
while (ite.hasNext()) {
Map.Entry> nodeAllocation = ite.next();
List allocations = nodeAllocation.getValue();
- long currTS = SystemClock.getInstance().getTime();
- if (allocations.size() > 0 && allocations.get(0).getTimeStamp()
- - currTS > timeThreshold) {
+ if (allocations.size() > 0
+ && curTS - allocations.get(0).getTimeStamp()
+ > schedulerActivitiesTTL) {
ite.remove();
}
}
@@ -171,11 +199,29 @@ public class ActivitiesManager extends AbstractService {
if (rmApp == null || rmApp.getFinalApplicationStatus()
!= FinalApplicationStatus.UNDEFINED) {
iteApp.remove();
+ } else {
+ Iterator appActivitiesIt =
+ appAllocation.getValue().iterator();
+ while (appActivitiesIt.hasNext()) {
+ if (curTS - appActivitiesIt.next().getTime()
+ > appActivitiesTTL) {
+ appActivitiesIt.remove();
+ } else {
+ break;
+ }
+ }
+ if (appAllocation.getValue().isEmpty()) {
+ iteApp.remove();
+ LOG.debug("Removed all expired activities from cache for {}.",
+ rmApp.getApplicationId());
+ }
}
}
+ LOG.debug("Remaining apps in app activities cache: {}",
+ completedAppAllocations.keySet());
try {
- Thread.sleep(5000);
+ Thread.sleep(activitiesCleanupIntervalMs);
} catch (InterruptedException e) {
LOG.info(getName() + " thread interrupted");
break;
@@ -290,7 +336,7 @@ public class ActivitiesManager extends AbstractService {
appAllocations = curAppAllocations;
}
}
- if (appAllocations.size() == 1000) {
+ if (appAllocations.size() == appActivitiesMaxQueueLength) {
appAllocations.poll();
}
appAllocations.add(appAllocation);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppActivitiesInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppActivitiesInfo.java
index 26d7e401a83..1b4cafe8119 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppActivitiesInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppActivitiesInfo.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
+import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -77,4 +78,9 @@ public class AppActivitiesInfo {
}
}
}
+
+ @VisibleForTesting
+ public List getAllocations() {
+ return allocations;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java
index 026edc34669..c9ce73771a9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java
@@ -30,10 +30,13 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
@@ -43,6 +46,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.Assert;
@@ -81,6 +85,8 @@ public class TestActivitiesManager {
@Before
public void setup() {
rmContext = Mockito.mock(RMContext.class);
+ Configuration conf = new Configuration();
+ Mockito.when(rmContext.getYarnConfiguration()).thenReturn(conf);
ResourceScheduler scheduler = Mockito.mock(ResourceScheduler.class);
Mockito.when(scheduler.getMinimumResourceCapability())
.thenReturn(Resources.none());
@@ -95,6 +101,8 @@ public class TestActivitiesManager {
RMApp mockApp = Mockito.mock(RMApp.class);
Mockito.doReturn(appAttemptId.getApplicationId()).when(mockApp)
.getApplicationId();
+ Mockito.doReturn(FinalApplicationStatus.UNDEFINED).when(mockApp)
+ .getFinalApplicationStatus();
rmApps.put(appAttemptId.getApplicationId(), mockApp);
FiCaSchedulerApp app =
new FiCaSchedulerApp(appAttemptId, "user", mockQueue,
@@ -245,6 +253,51 @@ public class TestActivitiesManager {
}
}
+ @Test (timeout = 30000)
+ public void testAppActivitiesTTL() throws Exception {
+ long cleanupIntervalMs = 100;
+ long appActivitiesTTL = 1000;
+ rmContext.getYarnConfiguration()
+ .setLong(YarnConfiguration.RM_ACTIVITIES_MANAGER_CLEANUP_INTERVAL_MS,
+ cleanupIntervalMs);
+ rmContext.getYarnConfiguration()
+ .setLong(YarnConfiguration.RM_ACTIVITIES_MANAGER_APP_ACTIVITIES_TTL_MS,
+ appActivitiesTTL);
+ ActivitiesManager newActivitiesManager = new ActivitiesManager(rmContext);
+ newActivitiesManager.serviceStart();
+ // start recording activities for first app and first node
+ SchedulerApplicationAttempt app = apps.get(0);
+ FiCaSchedulerNode node = (FiCaSchedulerNode) nodes.get(0);
+ newActivitiesManager
+ .turnOnAppActivitiesRecording(app.getApplicationId(), 3);
+ int numActivities = 10;
+ for (int i = 0; i < numActivities; i++) {
+ ActivitiesLogger.APP
+ .startAppAllocationRecording(newActivitiesManager, node,
+ SystemClock.getInstance().getTime(), app);
+ ActivitiesLogger.APP
+ .recordAppActivityWithoutAllocation(newActivitiesManager, node, app,
+ new SchedulerRequestKey(Priority.newInstance(0), 0, null),
+ ActivityDiagnosticConstant.FAIL_TO_ALLOCATE,
+ ActivityState.REJECTED);
+ ActivitiesLogger.APP
+ .finishAllocatedAppAllocationRecording(newActivitiesManager,
+ app.getApplicationId(), null, ActivityState.SKIPPED,
+ ActivityDiagnosticConstant.SKIPPED_ALL_PRIORITIES);
+ }
+ AppActivitiesInfo appActivitiesInfo = newActivitiesManager
+ .getAppActivitiesInfo(app.getApplicationId(), null, null);
+ Assert.assertEquals(numActivities,
+ appActivitiesInfo.getAllocations().size());
+ // sleep until all app activities expired
+ Thread.sleep(cleanupIntervalMs + appActivitiesTTL);
+ // there should be no remaining app activities
+ appActivitiesInfo = newActivitiesManager
+ .getAppActivitiesInfo(app.getApplicationId(), null, null);
+ Assert.assertEquals(0,
+ appActivitiesInfo.getAllocations().size());
+ }
+
/**
* Testing activities manager which can record all history information about
* node allocations.