YARN-2910. FSLeafQueue can throw ConcurrentModificationException. (Wilfred Spiegelenburg via kasha)
(cherry picked from commit a2e07a5456
)
This commit is contained in:
parent
784f481473
commit
1986ea8dd2
|
@ -173,6 +173,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
YARN-2931. PublicLocalizer may fail until directory is initialized by
|
YARN-2931. PublicLocalizer may fail until directory is initialized by
|
||||||
LocalizeRunner. (Anubhav Dhoot via kasha)
|
LocalizeRunner. (Anubhav Dhoot via kasha)
|
||||||
|
|
||||||
|
YARN-2910. FSLeafQueue can throw ConcurrentModificationException.
|
||||||
|
(Wilfred Spiegelenburg via kasha)
|
||||||
|
|
||||||
Release 2.6.0 - 2014-11-18
|
Release 2.6.0 - 2014-11-18
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -172,7 +172,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized Resource getHeadroom() {
|
public Resource getHeadroom() {
|
||||||
final FSQueue queue = (FSQueue) this.queue;
|
final FSQueue queue = (FSQueue) this.queue;
|
||||||
SchedulingPolicy policy = queue.getPolicy();
|
SchedulingPolicy policy = queue.getPolicy();
|
||||||
|
|
||||||
|
|
|
@ -23,6 +23,9 @@ import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -50,6 +53,10 @@ public class FSLeafQueue extends FSQueue {
|
||||||
new ArrayList<FSAppAttempt>();
|
new ArrayList<FSAppAttempt>();
|
||||||
private final List<FSAppAttempt> nonRunnableApps =
|
private final List<FSAppAttempt> nonRunnableApps =
|
||||||
new ArrayList<FSAppAttempt>();
|
new ArrayList<FSAppAttempt>();
|
||||||
|
// get a lock with fair distribution for app list updates
|
||||||
|
private final ReadWriteLock rwl = new ReentrantReadWriteLock(true);
|
||||||
|
private final Lock readLock = rwl.readLock();
|
||||||
|
private final Lock writeLock = rwl.writeLock();
|
||||||
|
|
||||||
private Resource demand = Resources.createResource(0);
|
private Resource demand = Resources.createResource(0);
|
||||||
|
|
||||||
|
@ -72,16 +79,26 @@ public class FSLeafQueue extends FSQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addApp(FSAppAttempt app, boolean runnable) {
|
public void addApp(FSAppAttempt app, boolean runnable) {
|
||||||
if (runnable) {
|
writeLock.lock();
|
||||||
runnableApps.add(app);
|
try {
|
||||||
} else {
|
if (runnable) {
|
||||||
nonRunnableApps.add(app);
|
runnableApps.add(app);
|
||||||
|
} else {
|
||||||
|
nonRunnableApps.add(app);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
writeLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// for testing
|
// for testing
|
||||||
void addAppSchedulable(FSAppAttempt appSched) {
|
void addAppSchedulable(FSAppAttempt appSched) {
|
||||||
runnableApps.add(appSched);
|
writeLock.lock();
|
||||||
|
try {
|
||||||
|
runnableApps.add(appSched);
|
||||||
|
} finally {
|
||||||
|
writeLock.unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -89,18 +106,25 @@ public class FSLeafQueue extends FSQueue {
|
||||||
* @return whether or not the app was runnable
|
* @return whether or not the app was runnable
|
||||||
*/
|
*/
|
||||||
public boolean removeApp(FSAppAttempt app) {
|
public boolean removeApp(FSAppAttempt app) {
|
||||||
if (runnableApps.remove(app)) {
|
boolean runnable = false;
|
||||||
// Update AM resource usage
|
writeLock.lock();
|
||||||
if (app.isAmRunning() && app.getAMResource() != null) {
|
try {
|
||||||
Resources.subtractFrom(amResourceUsage, app.getAMResource());
|
if (runnableApps.remove(app)) {
|
||||||
|
runnable = true;
|
||||||
|
} else if (nonRunnableApps.remove(app)) {
|
||||||
|
runnable = false; //nop, runnable is initialised to false already
|
||||||
|
} else {
|
||||||
|
throw new IllegalStateException("Given app to remove " + app +
|
||||||
|
" does not exist in queue " + this);
|
||||||
}
|
}
|
||||||
return true;
|
} finally {
|
||||||
} else if (nonRunnableApps.remove(app)) {
|
writeLock.unlock();
|
||||||
return false;
|
|
||||||
} else {
|
|
||||||
throw new IllegalStateException("Given app to remove " + app +
|
|
||||||
" does not exist in queue " + this);
|
|
||||||
}
|
}
|
||||||
|
// Update AM resource usage if needed
|
||||||
|
if (runnable && app.isAmRunning() && app.getAMResource() != null) {
|
||||||
|
Resources.subtractFrom(amResourceUsage, app.getAMResource());
|
||||||
|
}
|
||||||
|
return runnable;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Collection<FSAppAttempt> getRunnableAppSchedulables() {
|
public Collection<FSAppAttempt> getRunnableAppSchedulables() {
|
||||||
|
@ -114,11 +138,16 @@ public class FSLeafQueue extends FSQueue {
|
||||||
@Override
|
@Override
|
||||||
public void collectSchedulerApplications(
|
public void collectSchedulerApplications(
|
||||||
Collection<ApplicationAttemptId> apps) {
|
Collection<ApplicationAttemptId> apps) {
|
||||||
for (FSAppAttempt appSched : runnableApps) {
|
readLock.lock();
|
||||||
apps.add(appSched.getApplicationAttemptId());
|
try {
|
||||||
}
|
for (FSAppAttempt appSched : runnableApps) {
|
||||||
for (FSAppAttempt appSched : nonRunnableApps) {
|
apps.add(appSched.getApplicationAttemptId());
|
||||||
apps.add(appSched.getApplicationAttemptId());
|
}
|
||||||
|
for (FSAppAttempt appSched : nonRunnableApps) {
|
||||||
|
apps.add(appSched.getApplicationAttemptId());
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
readLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -144,11 +173,16 @@ public class FSLeafQueue extends FSQueue {
|
||||||
@Override
|
@Override
|
||||||
public Resource getResourceUsage() {
|
public Resource getResourceUsage() {
|
||||||
Resource usage = Resources.createResource(0);
|
Resource usage = Resources.createResource(0);
|
||||||
for (FSAppAttempt app : runnableApps) {
|
readLock.lock();
|
||||||
Resources.addTo(usage, app.getResourceUsage());
|
try {
|
||||||
}
|
for (FSAppAttempt app : runnableApps) {
|
||||||
for (FSAppAttempt app : nonRunnableApps) {
|
Resources.addTo(usage, app.getResourceUsage());
|
||||||
Resources.addTo(usage, app.getResourceUsage());
|
}
|
||||||
|
for (FSAppAttempt app : nonRunnableApps) {
|
||||||
|
Resources.addTo(usage, app.getResourceUsage());
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
readLock.unlock();
|
||||||
}
|
}
|
||||||
return usage;
|
return usage;
|
||||||
}
|
}
|
||||||
|
@ -164,17 +198,22 @@ public class FSLeafQueue extends FSQueue {
|
||||||
Resource maxRes = scheduler.getAllocationConfiguration()
|
Resource maxRes = scheduler.getAllocationConfiguration()
|
||||||
.getMaxResources(getName());
|
.getMaxResources(getName());
|
||||||
demand = Resources.createResource(0);
|
demand = Resources.createResource(0);
|
||||||
for (FSAppAttempt sched : runnableApps) {
|
readLock.lock();
|
||||||
if (Resources.equals(demand, maxRes)) {
|
try {
|
||||||
break;
|
for (FSAppAttempt sched : runnableApps) {
|
||||||
|
if (Resources.equals(demand, maxRes)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
updateDemandForApp(sched, maxRes);
|
||||||
}
|
}
|
||||||
updateDemandForApp(sched, maxRes);
|
for (FSAppAttempt sched : nonRunnableApps) {
|
||||||
}
|
if (Resources.equals(demand, maxRes)) {
|
||||||
for (FSAppAttempt sched : nonRunnableApps) {
|
break;
|
||||||
if (Resources.equals(demand, maxRes)) {
|
}
|
||||||
break;
|
updateDemandForApp(sched, maxRes);
|
||||||
}
|
}
|
||||||
updateDemandForApp(sched, maxRes);
|
} finally {
|
||||||
|
readLock.unlock();
|
||||||
}
|
}
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("The updated demand for " + getName() + " is " + demand
|
LOG.debug("The updated demand for " + getName() + " is " + demand
|
||||||
|
@ -198,7 +237,8 @@ public class FSLeafQueue extends FSQueue {
|
||||||
public Resource assignContainer(FSSchedulerNode node) {
|
public Resource assignContainer(FSSchedulerNode node) {
|
||||||
Resource assigned = Resources.none();
|
Resource assigned = Resources.none();
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Node " + node.getNodeName() + " offered to queue: " + getName());
|
LOG.debug("Node " + node.getNodeName() + " offered to queue: " +
|
||||||
|
getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!assignContainerPreCheck(node)) {
|
if (!assignContainerPreCheck(node)) {
|
||||||
|
@ -206,16 +246,26 @@ public class FSLeafQueue extends FSQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
Comparator<Schedulable> comparator = policy.getComparator();
|
Comparator<Schedulable> comparator = policy.getComparator();
|
||||||
Collections.sort(runnableApps, comparator);
|
writeLock.lock();
|
||||||
for (FSAppAttempt sched : runnableApps) {
|
try {
|
||||||
if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) {
|
Collections.sort(runnableApps, comparator);
|
||||||
continue;
|
} finally {
|
||||||
}
|
writeLock.unlock();
|
||||||
|
}
|
||||||
|
readLock.lock();
|
||||||
|
try {
|
||||||
|
for (FSAppAttempt sched : runnableApps) {
|
||||||
|
if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
assigned = sched.assignContainer(node);
|
assigned = sched.assignContainer(node);
|
||||||
if (!assigned.equals(Resources.none())) {
|
if (!assigned.equals(Resources.none())) {
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
readLock.unlock();
|
||||||
}
|
}
|
||||||
return assigned;
|
return assigned;
|
||||||
}
|
}
|
||||||
|
@ -237,11 +287,16 @@ public class FSLeafQueue extends FSQueue {
|
||||||
// Choose the app that is most over fair share
|
// Choose the app that is most over fair share
|
||||||
Comparator<Schedulable> comparator = policy.getComparator();
|
Comparator<Schedulable> comparator = policy.getComparator();
|
||||||
FSAppAttempt candidateSched = null;
|
FSAppAttempt candidateSched = null;
|
||||||
for (FSAppAttempt sched : runnableApps) {
|
readLock.lock();
|
||||||
if (candidateSched == null ||
|
try {
|
||||||
comparator.compare(sched, candidateSched) > 0) {
|
for (FSAppAttempt sched : runnableApps) {
|
||||||
candidateSched = sched;
|
if (candidateSched == null ||
|
||||||
|
comparator.compare(sched, candidateSched) > 0) {
|
||||||
|
candidateSched = sched;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
readLock.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Preempt from the selected app
|
// Preempt from the selected app
|
||||||
|
|
|
@ -28,12 +28,22 @@ import java.io.File;
|
||||||
import java.io.FileWriter;
|
import java.io.FileWriter;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||||
|
@ -222,4 +232,85 @@ public class TestFSLeafQueue extends FairSchedulerTestBase {
|
||||||
assertFalse(queueB1.isStarvedForFairShare());
|
assertFalse(queueB1.isStarvedForFairShare());
|
||||||
assertFalse(queueB2.isStarvedForFairShare());
|
assertFalse(queueB2.isStarvedForFairShare());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConcurrentAccess() {
|
||||||
|
conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
|
||||||
|
resourceManager = new MockRM(conf);
|
||||||
|
resourceManager.start();
|
||||||
|
scheduler = (FairScheduler) resourceManager.getResourceScheduler();
|
||||||
|
|
||||||
|
String queueName = "root.queue1";
|
||||||
|
final FSLeafQueue schedulable = scheduler.getQueueManager().
|
||||||
|
getLeafQueue(queueName, true);
|
||||||
|
ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1);
|
||||||
|
RMContext rmContext = resourceManager.getRMContext();
|
||||||
|
final FSAppAttempt app =
|
||||||
|
new FSAppAttempt(scheduler, applicationAttemptId, "user1",
|
||||||
|
schedulable, null, rmContext);
|
||||||
|
|
||||||
|
// this needs to be in sync with the number of runnables declared below
|
||||||
|
int testThreads = 2;
|
||||||
|
List<Runnable> runnables = new ArrayList<Runnable>();
|
||||||
|
|
||||||
|
// add applications to modify the list
|
||||||
|
runnables.add(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
for (int i=0; i < 500; i++) {
|
||||||
|
schedulable.addAppSchedulable(app);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// iterate over the list a couple of times in a different thread
|
||||||
|
runnables.add(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
for (int i=0; i < 500; i++) {
|
||||||
|
schedulable.getResourceUsage();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
final List<Throwable> exceptions = Collections.synchronizedList(
|
||||||
|
new ArrayList<Throwable>());
|
||||||
|
final ExecutorService threadPool = Executors.newFixedThreadPool(
|
||||||
|
testThreads);
|
||||||
|
|
||||||
|
try {
|
||||||
|
final CountDownLatch allExecutorThreadsReady =
|
||||||
|
new CountDownLatch(testThreads);
|
||||||
|
final CountDownLatch startBlocker = new CountDownLatch(1);
|
||||||
|
final CountDownLatch allDone = new CountDownLatch(testThreads);
|
||||||
|
for (final Runnable submittedTestRunnable : runnables) {
|
||||||
|
threadPool.submit(new Runnable() {
|
||||||
|
public void run() {
|
||||||
|
allExecutorThreadsReady.countDown();
|
||||||
|
try {
|
||||||
|
startBlocker.await();
|
||||||
|
submittedTestRunnable.run();
|
||||||
|
} catch (final Throwable e) {
|
||||||
|
exceptions.add(e);
|
||||||
|
} finally {
|
||||||
|
allDone.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
// wait until all threads are ready
|
||||||
|
allExecutorThreadsReady.await();
|
||||||
|
// start all test runners
|
||||||
|
startBlocker.countDown();
|
||||||
|
int testTimeout = 2;
|
||||||
|
assertTrue("Timeout waiting for more than " + testTimeout + " seconds",
|
||||||
|
allDone.await(testTimeout, TimeUnit.SECONDS));
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
exceptions.add(ie);
|
||||||
|
} finally {
|
||||||
|
threadPool.shutdownNow();
|
||||||
|
}
|
||||||
|
assertTrue("Test failed with exception(s)" + exceptions,
|
||||||
|
exceptions.isEmpty());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue