diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java index a9d3c058e65..ea17feda8c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java @@ -30,6 +30,7 @@ import java.util.PriorityQueue; import java.util.Set; import java.util.TreeSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -419,8 +420,10 @@ public class FifoIntraQueuePreemptionPlugin String userName = app.getUser(); TempUserPerPartition tmpUser = usersPerPartition.get(userName); if (tmpUser == null) { - ResourceUsage userResourceUsage = tq.leafQueue.getUser(userName) - .getResourceUsage(); + // User might have already been removed, but preemption still accounts for this app, + // therefore reinserting the user will not cause a memory leak + User user = tq.leafQueue.getOrCreateUser(userName); + ResourceUsage userResourceUsage = user.getResourceUsage(); // perUserAMUsed was populated with running apps, now we are looping // through both running and pending apps. @@ -428,8 +431,7 @@ public class FifoIntraQueuePreemptionPlugin amUsed = (userSpecificAmUsed == null) ? Resources.none() : userSpecificAmUsed; - tmpUser = new TempUserPerPartition( - tq.leafQueue.getUser(userName), tq.queueName, + tmpUser = new TempUserPerPartition(user, tq.queueName, Resources.clone(userResourceUsage.getUsed(partition)), Resources.clone(amUsed), Resources.clone(userResourceUsage.getReserved(partition)), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index a4e2a822ead..c7f42d1ad36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -519,6 +519,11 @@ public class LeafQueue extends AbstractCSQueue { return usersManager.getUser(userName); } + @VisibleForTesting + public User getOrCreateUser(String userName) { + return usersManager.getUserAndAddIfAbsent(userName); + } + @Private public List getPriorityACLs() { readLock.lock(); @@ -2007,7 +2012,12 @@ public class LeafQueue extends AbstractCSQueue { public void incAMUsedResource(String nodeLabel, Resource resourceToInc, SchedulerApplicationAttempt application) { - getUser(application.getUser()).getResourceUsage().incAMUsed(nodeLabel, + User user = getUser(application.getUser()); + if (user == null) { + return; + } + + user.getResourceUsage().incAMUsed(nodeLabel, resourceToInc); // ResourceUsage has its own lock, no addition lock needs here. usageTracker.getQueueUsage().incAMUsed(nodeLabel, resourceToInc); @@ -2015,7 +2025,12 @@ public class LeafQueue extends AbstractCSQueue { public void decAMUsedResource(String nodeLabel, Resource resourceToDec, SchedulerApplicationAttempt application) { - getUser(application.getUser()).getResourceUsage().decAMUsed(nodeLabel, + User user = getUser(application.getUser()); + if (user == null) { + return; + } + + user.getResourceUsage().decAMUsed(nodeLabel, resourceToDec); // ResourceUsage has its own lock, no addition lock needs here. usageTracker.getQueueUsage().decAMUsed(nodeLabel, resourceToDec); @@ -2103,7 +2118,7 @@ public class LeafQueue extends AbstractCSQueue { for (FiCaSchedulerApp app : getApplications()) { String userName = app.getUser(); if (!userNameToHeadroom.containsKey(userName)) { - User user = getUser(userName); + User user = getUsersManager().getUserAndAddIfAbsent(userName); Resource headroom = Resources.subtract( getResourceLimitForActiveUsers(app.getUser(), clusterResources, partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java index 8ba13f029f8..94df9ab22c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java @@ -817,6 +817,7 @@ public class UsersManager implements AbstractUsersManager { lQueue.getMinimumAllocation()); if (LOG.isDebugEnabled()) { + float weight = lQueue.getUserWeights().getByUser(userName); LOG.debug("User limit computation for " + userName + ", in queue: " + lQueue.getQueuePath() + ", userLimitPercent=" + lQueue.getUserLimit() @@ -834,7 +835,7 @@ public class UsersManager implements AbstractUsersManager { + ", Partition=" + nodePartition + ", resourceUsed=" + resourceUsed + ", maxUserLimit=" + maxUserLimit - + ", userWeight=" + getUser(userName).getWeight() + + ", userWeight=" + weight ); } return userLimitResource; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockApplications.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockApplications.java index b16861257ff..aa00a1a4388 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockApplications.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockApplications.java @@ -165,6 +165,7 @@ class MockApplications { user.setResourceUsage(userResourceUsage.get(userName)); } when(queue.getUser(eq(userName))).thenReturn(user); + when(queue.getOrCreateUser(eq(userName))).thenReturn(user); when(queue.getResourceLimitForAllUsers(eq(userName), any(Resource.class), anyString(), any(SchedulingMode.class))) .thenReturn(userLimit);