YARN-2910. FSLeafQueue can throw ConcurrentModificationException. (Wilfred Spiegelenburg via kasha)

(cherry picked from commit a2e07a5456)
(cherry picked from commit 1986ea8dd2)
(cherry picked from commit 2b827a18d7b4eb41dc0095ea7277239273e7e396)
This commit is contained in:
Karthik Kambatla 2014-12-09 14:00:31 -08:00 committed by Vinod Kumar Vavilapalli
parent 2a2888f2f3
commit 6e954bc25c
4 changed files with 197 additions and 48 deletions

View File

@ -39,6 +39,9 @@ Release 2.6.1 - UNRELEASED
YARN-2874. Dead lock in "DelegationTokenRenewer" which blocks RM to execute YARN-2874. Dead lock in "DelegationTokenRenewer" which blocks RM to execute
any further apps. (Naganarasimha G R via kasha) any further apps. (Naganarasimha G R via kasha)
YARN-2910. FSLeafQueue can throw ConcurrentModificationException.
(Wilfred Spiegelenburg via kasha)
Release 2.6.0 - 2014-11-18 Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -172,7 +172,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
} }
@Override @Override
public synchronized Resource getHeadroom() { public Resource getHeadroom() {
final FSQueue queue = (FSQueue) this.queue; final FSQueue queue = (FSQueue) this.queue;
SchedulingPolicy policy = queue.getPolicy(); SchedulingPolicy policy = queue.getPolicy();

View File

@ -23,6 +23,9 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -50,6 +53,10 @@ public class FSLeafQueue extends FSQueue {
new ArrayList<FSAppAttempt>(); new ArrayList<FSAppAttempt>();
private final List<FSAppAttempt> nonRunnableApps = private final List<FSAppAttempt> nonRunnableApps =
new ArrayList<FSAppAttempt>(); new ArrayList<FSAppAttempt>();
// get a lock with fair distribution for app list updates
private final ReadWriteLock rwl = new ReentrantReadWriteLock(true);
private final Lock readLock = rwl.readLock();
private final Lock writeLock = rwl.writeLock();
private Resource demand = Resources.createResource(0); private Resource demand = Resources.createResource(0);
@ -72,16 +79,26 @@ public class FSLeafQueue extends FSQueue {
} }
public void addApp(FSAppAttempt app, boolean runnable) { public void addApp(FSAppAttempt app, boolean runnable) {
writeLock.lock();
try {
if (runnable) { if (runnable) {
runnableApps.add(app); runnableApps.add(app);
} else { } else {
nonRunnableApps.add(app); nonRunnableApps.add(app);
} }
} finally {
writeLock.unlock();
}
} }
// for testing // for testing
void addAppSchedulable(FSAppAttempt appSched) { void addAppSchedulable(FSAppAttempt appSched) {
writeLock.lock();
try {
runnableApps.add(appSched); runnableApps.add(appSched);
} finally {
writeLock.unlock();
}
} }
/** /**
@ -89,18 +106,25 @@ public class FSLeafQueue extends FSQueue {
* @return whether or not the app was runnable * @return whether or not the app was runnable
*/ */
public boolean removeApp(FSAppAttempt app) { public boolean removeApp(FSAppAttempt app) {
boolean runnable = false;
writeLock.lock();
try {
if (runnableApps.remove(app)) { if (runnableApps.remove(app)) {
// Update AM resource usage runnable = true;
if (app.isAmRunning() && app.getAMResource() != null) {
Resources.subtractFrom(amResourceUsage, app.getAMResource());
}
return true;
} else if (nonRunnableApps.remove(app)) { } else if (nonRunnableApps.remove(app)) {
return false; runnable = false; //nop, runnable is initialised to false already
} else { } else {
throw new IllegalStateException("Given app to remove " + app + throw new IllegalStateException("Given app to remove " + app +
" does not exist in queue " + this); " does not exist in queue " + this);
} }
} finally {
writeLock.unlock();
}
// Update AM resource usage if needed
if (runnable && app.isAmRunning() && app.getAMResource() != null) {
Resources.subtractFrom(amResourceUsage, app.getAMResource());
}
return runnable;
} }
public Collection<FSAppAttempt> getRunnableAppSchedulables() { public Collection<FSAppAttempt> getRunnableAppSchedulables() {
@ -114,12 +138,17 @@ public class FSLeafQueue extends FSQueue {
@Override @Override
public void collectSchedulerApplications( public void collectSchedulerApplications(
Collection<ApplicationAttemptId> apps) { Collection<ApplicationAttemptId> apps) {
readLock.lock();
try {
for (FSAppAttempt appSched : runnableApps) { for (FSAppAttempt appSched : runnableApps) {
apps.add(appSched.getApplicationAttemptId()); apps.add(appSched.getApplicationAttemptId());
} }
for (FSAppAttempt appSched : nonRunnableApps) { for (FSAppAttempt appSched : nonRunnableApps) {
apps.add(appSched.getApplicationAttemptId()); apps.add(appSched.getApplicationAttemptId());
} }
} finally {
readLock.unlock();
}
} }
@Override @Override
@ -144,12 +173,17 @@ public class FSLeafQueue extends FSQueue {
@Override @Override
public Resource getResourceUsage() { public Resource getResourceUsage() {
Resource usage = Resources.createResource(0); Resource usage = Resources.createResource(0);
readLock.lock();
try {
for (FSAppAttempt app : runnableApps) { for (FSAppAttempt app : runnableApps) {
Resources.addTo(usage, app.getResourceUsage()); Resources.addTo(usage, app.getResourceUsage());
} }
for (FSAppAttempt app : nonRunnableApps) { for (FSAppAttempt app : nonRunnableApps) {
Resources.addTo(usage, app.getResourceUsage()); Resources.addTo(usage, app.getResourceUsage());
} }
} finally {
readLock.unlock();
}
return usage; return usage;
} }
@ -164,6 +198,8 @@ public class FSLeafQueue extends FSQueue {
Resource maxRes = scheduler.getAllocationConfiguration() Resource maxRes = scheduler.getAllocationConfiguration()
.getMaxResources(getName()); .getMaxResources(getName());
demand = Resources.createResource(0); demand = Resources.createResource(0);
readLock.lock();
try {
for (FSAppAttempt sched : runnableApps) { for (FSAppAttempt sched : runnableApps) {
if (Resources.equals(demand, maxRes)) { if (Resources.equals(demand, maxRes)) {
break; break;
@ -176,6 +212,9 @@ public class FSLeafQueue extends FSQueue {
} }
updateDemandForApp(sched, maxRes); updateDemandForApp(sched, maxRes);
} }
} finally {
readLock.unlock();
}
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("The updated demand for " + getName() + " is " + demand LOG.debug("The updated demand for " + getName() + " is " + demand
+ "; the max is " + maxRes); + "; the max is " + maxRes);
@ -198,7 +237,8 @@ public class FSLeafQueue extends FSQueue {
public Resource assignContainer(FSSchedulerNode node) { public Resource assignContainer(FSSchedulerNode node) {
Resource assigned = Resources.none(); Resource assigned = Resources.none();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Node " + node.getNodeName() + " offered to queue: " + getName()); LOG.debug("Node " + node.getNodeName() + " offered to queue: " +
getName());
} }
if (!assignContainerPreCheck(node)) { if (!assignContainerPreCheck(node)) {
@ -206,7 +246,14 @@ public class FSLeafQueue extends FSQueue {
} }
Comparator<Schedulable> comparator = policy.getComparator(); Comparator<Schedulable> comparator = policy.getComparator();
writeLock.lock();
try {
Collections.sort(runnableApps, comparator); Collections.sort(runnableApps, comparator);
} finally {
writeLock.unlock();
}
readLock.lock();
try {
for (FSAppAttempt sched : runnableApps) { for (FSAppAttempt sched : runnableApps) {
if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) { if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) {
continue; continue;
@ -217,6 +264,9 @@ public class FSLeafQueue extends FSQueue {
break; break;
} }
} }
} finally {
readLock.unlock();
}
return assigned; return assigned;
} }
@ -237,12 +287,17 @@ public class FSLeafQueue extends FSQueue {
// Choose the app that is most over fair share // Choose the app that is most over fair share
Comparator<Schedulable> comparator = policy.getComparator(); Comparator<Schedulable> comparator = policy.getComparator();
FSAppAttempt candidateSched = null; FSAppAttempt candidateSched = null;
readLock.lock();
try {
for (FSAppAttempt sched : runnableApps) { for (FSAppAttempt sched : runnableApps) {
if (candidateSched == null || if (candidateSched == null ||
comparator.compare(sched, candidateSched) > 0) { comparator.compare(sched, candidateSched) > 0) {
candidateSched = sched; candidateSched = sched;
} }
} }
} finally {
readLock.unlock();
}
// Preempt from the selected app // Preempt from the selected app
if (candidateSched != null) { if (candidateSched != null) {

View File

@ -28,12 +28,22 @@ import java.io.File;
import java.io.FileWriter; import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
@ -222,4 +232,85 @@ public class TestFSLeafQueue extends FairSchedulerTestBase {
assertFalse(queueB1.isStarvedForFairShare()); assertFalse(queueB1.isStarvedForFairShare());
assertFalse(queueB2.isStarvedForFairShare()); assertFalse(queueB2.isStarvedForFairShare());
} }
@Test
public void testConcurrentAccess() {
conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
resourceManager = new MockRM(conf);
resourceManager.start();
scheduler = (FairScheduler) resourceManager.getResourceScheduler();
String queueName = "root.queue1";
final FSLeafQueue schedulable = scheduler.getQueueManager().
getLeafQueue(queueName, true);
ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1);
RMContext rmContext = resourceManager.getRMContext();
final FSAppAttempt app =
new FSAppAttempt(scheduler, applicationAttemptId, "user1",
schedulable, null, rmContext);
// this needs to be in sync with the number of runnables declared below
int testThreads = 2;
List<Runnable> runnables = new ArrayList<Runnable>();
// add applications to modify the list
runnables.add(new Runnable() {
@Override
public void run() {
for (int i=0; i < 500; i++) {
schedulable.addAppSchedulable(app);
}
}
});
// iterate over the list a couple of times in a different thread
runnables.add(new Runnable() {
@Override
public void run() {
for (int i=0; i < 500; i++) {
schedulable.getResourceUsage();
}
}
});
final List<Throwable> exceptions = Collections.synchronizedList(
new ArrayList<Throwable>());
final ExecutorService threadPool = Executors.newFixedThreadPool(
testThreads);
try {
final CountDownLatch allExecutorThreadsReady =
new CountDownLatch(testThreads);
final CountDownLatch startBlocker = new CountDownLatch(1);
final CountDownLatch allDone = new CountDownLatch(testThreads);
for (final Runnable submittedTestRunnable : runnables) {
threadPool.submit(new Runnable() {
public void run() {
allExecutorThreadsReady.countDown();
try {
startBlocker.await();
submittedTestRunnable.run();
} catch (final Throwable e) {
exceptions.add(e);
} finally {
allDone.countDown();
}
}
});
}
// wait until all threads are ready
allExecutorThreadsReady.await();
// start all test runners
startBlocker.countDown();
int testTimeout = 2;
assertTrue("Timeout waiting for more than " + testTimeout + " seconds",
allDone.await(testTimeout, TimeUnit.SECONDS));
} catch (InterruptedException ie) {
exceptions.add(ie);
} finally {
threadPool.shutdownNow();
}
assertTrue("Test failed with exception(s)" + exceptions,
exceptions.isEmpty());
}
} }