From 02599bda04e0ef46f4628b006f2430ad63cac97e Mon Sep 17 00:00:00 2001 From: Sunil G Date: Mon, 28 Aug 2017 12:22:56 +0530 Subject: [PATCH] YARN-7051. Avoid concurrent modification exception in FifoIntraQueuePreemptionPlugin. Contributed by Eric Payne. --- .../FifoIntraQueuePreemptionPlugin.java | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java index 4bf6760287d..00ae3dacb9e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java @@ -397,10 +397,16 @@ public class FifoIntraQueuePreemptionPlugin ResourceUsage userResourceUsage = tq.leafQueue.getUser(userName) .getResourceUsage(); + // perUserAMUsed was populated with running apps, now we are looping + // through both running and pending apps. + Resource userSpecificAmUsed = perUserAMUsed.get(userName); + amUsed = (userSpecificAmUsed == null) + ? Resources.none() : userSpecificAmUsed; + TempUserPerPartition tmpUser = new TempUserPerPartition( tq.leafQueue.getUser(userName), tq.queueName, Resources.clone(userResourceUsage.getUsed(partition)), - Resources.clone(perUserAMUsed.get(userName)), + Resources.clone(userSpecificAmUsed), Resources.clone(userResourceUsage.getReserved(partition)), Resources.none()); @@ -547,15 +553,17 @@ public class FifoIntraQueuePreemptionPlugin Collection runningApps = leafQueue.getApplications(); Resource amUsed = Resources.createResource(0, 0); - for (FiCaSchedulerApp app : runningApps) { - Resource userAMResource = perUserAMUsed.get(app.getUser()); - if (null == userAMResource) { - userAMResource = Resources.createResource(0, 0); - perUserAMUsed.put(app.getUser(), userAMResource); - } + synchronized (leafQueue) { + for (FiCaSchedulerApp app : runningApps) { + Resource userAMResource = perUserAMUsed.get(app.getUser()); + if (null == userAMResource) { + userAMResource = Resources.createResource(0, 0); + perUserAMUsed.put(app.getUser(), userAMResource); + } - Resources.addTo(userAMResource, app.getAMResource(partition)); - Resources.addTo(amUsed, app.getAMResource(partition)); + Resources.addTo(userAMResource, app.getAMResource(partition)); + Resources.addTo(amUsed, app.getAMResource(partition)); + } } return amUsed;