From 6e954bc25c8fdb5a34403f6947b6fd8413f4e792 Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Tue, 9 Dec 2014 14:00:31 -0800 Subject: [PATCH] YARN-2910. FSLeafQueue can throw ConcurrentModificationException. (Wilfred Spiegelenburg via kasha) (cherry picked from commit a2e07a54561a57a83b943628ebbc53ed5ba52718) (cherry picked from commit 1986ea8dd223267ced3e3aef69980b46e2fef740) (cherry picked from commit 2b827a18d7b4eb41dc0095ea7277239273e7e396) --- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/fair/FSAppAttempt.java | 2 +- .../scheduler/fair/FSLeafQueue.java | 149 ++++++++++++------ .../scheduler/fair/TestFSLeafQueue.java | 91 +++++++++++ 4 files changed, 197 insertions(+), 48 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 22bbe763d38..ca1bc2aad1d 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -39,6 +39,9 @@ Release 2.6.1 - UNRELEASED YARN-2874. Dead lock in "DelegationTokenRenewer" which blocks RM to execute any further apps. (Naganarasimha G R via kasha) + YARN-2910. FSLeafQueue can throw ConcurrentModificationException. + (Wilfred Spiegelenburg via kasha) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index b9966e7f551..b23ec3ed30e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -172,7 +172,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt } @Override - public synchronized Resource getHeadroom() { + public Resource getHeadroom() { final FSQueue queue = (FSQueue) this.queue; SchedulingPolicy policy = queue.getPolicy(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 345ea8b7c36..bbf1be71755 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -23,6 +23,9 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; @@ -50,6 +53,10 @@ public class FSLeafQueue extends FSQueue { new ArrayList(); private final List nonRunnableApps = new ArrayList(); + // get a lock with fair distribution for app list updates + private final ReadWriteLock rwl = new ReentrantReadWriteLock(true); + private final Lock readLock = rwl.readLock(); + private final Lock writeLock = rwl.writeLock(); private Resource demand = Resources.createResource(0); @@ -72,16 +79,26 @@ public class FSLeafQueue extends FSQueue { } public void addApp(FSAppAttempt app, boolean runnable) { - if (runnable) { - runnableApps.add(app); - } else { - nonRunnableApps.add(app); + writeLock.lock(); + try { + if (runnable) { + runnableApps.add(app); + } else { + nonRunnableApps.add(app); + } + } finally { + writeLock.unlock(); } } // for testing void addAppSchedulable(FSAppAttempt appSched) { - runnableApps.add(appSched); + writeLock.lock(); + try { + runnableApps.add(appSched); + } finally { + writeLock.unlock(); + } } /** @@ -89,18 +106,25 @@ public class FSLeafQueue extends FSQueue { * @return whether or not the app was runnable */ public boolean removeApp(FSAppAttempt app) { - if (runnableApps.remove(app)) { - // Update AM resource usage - if (app.isAmRunning() && app.getAMResource() != null) { - Resources.subtractFrom(amResourceUsage, app.getAMResource()); + boolean runnable = false; + writeLock.lock(); + try { + if (runnableApps.remove(app)) { + runnable = true; + } else if (nonRunnableApps.remove(app)) { + runnable = false; //nop, runnable is initialised to false already + } else { + throw new IllegalStateException("Given app to remove " + app + + " does not exist in queue " + this); } - return true; - } else if (nonRunnableApps.remove(app)) { - return false; - } else { - throw new IllegalStateException("Given app to remove " + app + - " does not exist in queue " + this); + } finally { + writeLock.unlock(); } + // Update AM resource usage if needed + if (runnable && app.isAmRunning() && app.getAMResource() != null) { + Resources.subtractFrom(amResourceUsage, app.getAMResource()); + } + return runnable; } public Collection getRunnableAppSchedulables() { @@ -114,11 +138,16 @@ public class FSLeafQueue extends FSQueue { @Override public void collectSchedulerApplications( Collection apps) { - for (FSAppAttempt appSched : runnableApps) { - apps.add(appSched.getApplicationAttemptId()); - } - for (FSAppAttempt appSched : nonRunnableApps) { - apps.add(appSched.getApplicationAttemptId()); + readLock.lock(); + try { + for (FSAppAttempt appSched : runnableApps) { + apps.add(appSched.getApplicationAttemptId()); + } + for (FSAppAttempt appSched : nonRunnableApps) { + apps.add(appSched.getApplicationAttemptId()); + } + } finally { + readLock.unlock(); } } @@ -144,11 +173,16 @@ public class FSLeafQueue extends FSQueue { @Override public Resource getResourceUsage() { Resource usage = Resources.createResource(0); - for (FSAppAttempt app : runnableApps) { - Resources.addTo(usage, app.getResourceUsage()); - } - for (FSAppAttempt app : nonRunnableApps) { - Resources.addTo(usage, app.getResourceUsage()); + readLock.lock(); + try { + for (FSAppAttempt app : runnableApps) { + Resources.addTo(usage, app.getResourceUsage()); + } + for (FSAppAttempt app : nonRunnableApps) { + Resources.addTo(usage, app.getResourceUsage()); + } + } finally { + readLock.unlock(); } return usage; } @@ -164,17 +198,22 @@ public class FSLeafQueue extends FSQueue { Resource maxRes = scheduler.getAllocationConfiguration() .getMaxResources(getName()); demand = Resources.createResource(0); - for (FSAppAttempt sched : runnableApps) { - if (Resources.equals(demand, maxRes)) { - break; + readLock.lock(); + try { + for (FSAppAttempt sched : runnableApps) { + if (Resources.equals(demand, maxRes)) { + break; + } + updateDemandForApp(sched, maxRes); } - updateDemandForApp(sched, maxRes); - } - for (FSAppAttempt sched : nonRunnableApps) { - if (Resources.equals(demand, maxRes)) { - break; + for (FSAppAttempt sched : nonRunnableApps) { + if (Resources.equals(demand, maxRes)) { + break; + } + updateDemandForApp(sched, maxRes); } - updateDemandForApp(sched, maxRes); + } finally { + readLock.unlock(); } if (LOG.isDebugEnabled()) { LOG.debug("The updated demand for " + getName() + " is " + demand @@ -198,7 +237,8 @@ public class FSLeafQueue extends FSQueue { public Resource assignContainer(FSSchedulerNode node) { Resource assigned = Resources.none(); if (LOG.isDebugEnabled()) { - LOG.debug("Node " + node.getNodeName() + " offered to queue: " + getName()); + LOG.debug("Node " + node.getNodeName() + " offered to queue: " + + getName()); } if (!assignContainerPreCheck(node)) { @@ -206,16 +246,26 @@ public class FSLeafQueue extends FSQueue { } Comparator comparator = policy.getComparator(); - Collections.sort(runnableApps, comparator); - for (FSAppAttempt sched : runnableApps) { - if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) { - continue; - } + writeLock.lock(); + try { + Collections.sort(runnableApps, comparator); + } finally { + writeLock.unlock(); + } + readLock.lock(); + try { + for (FSAppAttempt sched : runnableApps) { + if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) { + continue; + } - assigned = sched.assignContainer(node); - if (!assigned.equals(Resources.none())) { - break; + assigned = sched.assignContainer(node); + if (!assigned.equals(Resources.none())) { + break; + } } + } finally { + readLock.unlock(); } return assigned; } @@ -237,11 +287,16 @@ public class FSLeafQueue extends FSQueue { // Choose the app that is most over fair share Comparator comparator = policy.getComparator(); FSAppAttempt candidateSched = null; - for (FSAppAttempt sched : runnableApps) { - if (candidateSched == null || - comparator.compare(sched, candidateSched) > 0) { - candidateSched = sched; + readLock.lock(); + try { + for (FSAppAttempt sched : runnableApps) { + if (candidateSched == null || + comparator.compare(sched, candidateSched) > 0) { + candidateSched = sched; + } } + } finally { + readLock.unlock(); } // Preempt from the selected app diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java index 97736bedd04..385ea0be76b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java @@ -28,12 +28,22 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; @@ -222,4 +232,85 @@ public class TestFSLeafQueue extends FairSchedulerTestBase { assertFalse(queueB1.isStarvedForFairShare()); assertFalse(queueB2.isStarvedForFairShare()); } + + @Test + public void testConcurrentAccess() { + conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false"); + resourceManager = new MockRM(conf); + resourceManager.start(); + scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + + String queueName = "root.queue1"; + final FSLeafQueue schedulable = scheduler.getQueueManager(). + getLeafQueue(queueName, true); + ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1); + RMContext rmContext = resourceManager.getRMContext(); + final FSAppAttempt app = + new FSAppAttempt(scheduler, applicationAttemptId, "user1", + schedulable, null, rmContext); + + // this needs to be in sync with the number of runnables declared below + int testThreads = 2; + List runnables = new ArrayList(); + + // add applications to modify the list + runnables.add(new Runnable() { + @Override + public void run() { + for (int i=0; i < 500; i++) { + schedulable.addAppSchedulable(app); + } + } + }); + + // iterate over the list a couple of times in a different thread + runnables.add(new Runnable() { + @Override + public void run() { + for (int i=0; i < 500; i++) { + schedulable.getResourceUsage(); + } + } + }); + + final List exceptions = Collections.synchronizedList( + new ArrayList()); + final ExecutorService threadPool = Executors.newFixedThreadPool( + testThreads); + + try { + final CountDownLatch allExecutorThreadsReady = + new CountDownLatch(testThreads); + final CountDownLatch startBlocker = new CountDownLatch(1); + final CountDownLatch allDone = new CountDownLatch(testThreads); + for (final Runnable submittedTestRunnable : runnables) { + threadPool.submit(new Runnable() { + public void run() { + allExecutorThreadsReady.countDown(); + try { + startBlocker.await(); + submittedTestRunnable.run(); + } catch (final Throwable e) { + exceptions.add(e); + } finally { + allDone.countDown(); + } + } + }); + } + // wait until all threads are ready + allExecutorThreadsReady.await(); + // start all test runners + startBlocker.countDown(); + int testTimeout = 2; + assertTrue("Timeout waiting for more than " + testTimeout + " seconds", + allDone.await(testTimeout, TimeUnit.SECONDS)); + } catch (InterruptedException ie) { + exceptions.add(ie); + } finally { + threadPool.shutdownNow(); + } + assertTrue("Test failed with exception(s)" + exceptions, + exceptions.isEmpty()); + } }