YARN-2922. ConcurrentModificationException in CapacityScheduler's LeafQueue. Contributed by Rohith Sharmaks.

This commit is contained in:
Tsuyoshi Ozawa 2015-01-05 00:08:31 +09:00
parent 947578c1c1
commit ddc5be48fc
3 changed files with 91 additions and 2 deletions

View File

@ -308,6 +308,9 @@ Release 2.7.0 - UNRELEASED
YARN-2991. Fixed DrainDispatcher to reuse the draining code path in YARN-2991. Fixed DrainDispatcher to reuse the draining code path in
AsyncDispatcher. (Rohith Sharmaks via zjshen) AsyncDispatcher. (Rohith Sharmaks via zjshen)
YARN-2922. ConcurrentModificationException in CapacityScheduler's LeafQueue.
(Rohith Sharmaks via ozawa)
Release 2.6.0 - 2014-11-18 Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -1878,7 +1878,7 @@ public class LeafQueue extends AbstractCSQueue {
} }
// return a single Resource capturing the overal amount of pending resources // return a single Resource capturing the overal amount of pending resources
public Resource getTotalResourcePending() { public synchronized Resource getTotalResourcePending() {
Resource ret = BuilderUtils.newResource(0, 0); Resource ret = BuilderUtils.newResource(0, 0);
for (FiCaSchedulerApp f : activeApplications) { for (FiCaSchedulerApp f : activeApplications) {
Resources.addTo(ret, f.getTotalPendingRequests()); Resources.addTo(ret, f.getTotalPendingRequests());
@ -1887,7 +1887,7 @@ public class LeafQueue extends AbstractCSQueue {
} }
@Override @Override
public void collectSchedulerApplications( public synchronized void collectSchedulerApplications(
Collection<ApplicationAttemptId> apps) { Collection<ApplicationAttemptId> apps) {
for (FiCaSchedulerApp pendingApp : pendingApplications) { for (FiCaSchedulerApp pendingApp : pendingApplications) {
apps.add(pendingApp.getApplicationAttemptId()); apps.add(pendingApp.getApplicationAttemptId());

View File

@ -37,11 +37,13 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CyclicBarrier;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -61,6 +63,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@ -2353,6 +2356,89 @@ public class TestLeafQueue {
} }
} }
@Test
public void testConcurrentAccess() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
MockRM rm = new MockRM();
rm.init(conf);
rm.start();
final String queue = "default";
final String user = "user";
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
final LeafQueue defaultQueue = (LeafQueue) cs.getQueue(queue);
final List<FiCaSchedulerApp> listOfApps =
createListOfApps(10000, user, defaultQueue);
final CyclicBarrier cb = new CyclicBarrier(2);
final List<ConcurrentModificationException> conException =
new ArrayList<ConcurrentModificationException>();
Thread submitAndRemove = new Thread(new Runnable() {
@Override
public void run() {
for (FiCaSchedulerApp fiCaSchedulerApp : listOfApps) {
defaultQueue.submitApplicationAttempt(fiCaSchedulerApp, user);
}
try {
cb.await();
} catch (Exception e) {
// Ignore
}
for (FiCaSchedulerApp fiCaSchedulerApp : listOfApps) {
defaultQueue.finishApplicationAttempt(fiCaSchedulerApp, queue);
}
}
}, "SubmitAndRemoveApplicationAttempt Thread");
Thread getAppsInQueue = new Thread(new Runnable() {
List<ApplicationAttemptId> apps = new ArrayList<ApplicationAttemptId>();
@Override
public void run() {
try {
try {
cb.await();
} catch (Exception e) {
// Ignore
}
defaultQueue.collectSchedulerApplications(apps);
} catch (ConcurrentModificationException e) {
conException.add(e);
}
}
}, "GetAppsInQueue Thread");
submitAndRemove.start();
getAppsInQueue.start();
submitAndRemove.join();
getAppsInQueue.join();
assertTrue("ConcurrentModificationException is thrown",
conException.isEmpty());
rm.stop();
}
private List<FiCaSchedulerApp> createListOfApps(int noOfApps, String user,
LeafQueue defaultQueue) {
List<FiCaSchedulerApp> appsLists = new ArrayList<FiCaSchedulerApp>();
for (int i = 0; i < noOfApps; i++) {
ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(i, 0);
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user, defaultQueue,
mock(ActiveUsersManager.class), spyRMContext);
appsLists.add(app_0);
}
return appsLists;
}
private CapacitySchedulerContext mockCSContext( private CapacitySchedulerContext mockCSContext(
CapacitySchedulerConfiguration csConf, Resource clusterResource) { CapacitySchedulerConfiguration csConf, Resource clusterResource) {
CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class); CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);