YARN-4204. ConcurrentModificationException in FairSchedulerQueueInfo. (adhoot)

This commit is contained in:
Anubhav Dhoot 2015-09-27 20:52:38 -07:00
parent 892ade689f
commit fb2e525c07
5 changed files with 100 additions and 13 deletions

View File

@ -902,6 +902,8 @@ Release 2.8.0 - UNRELEASED
YARN-4044. Running applications information changes such as movequeue is not published to YARN-4044. Running applications information changes such as movequeue is not published to
TimeLine server. (Sunil G via rohithsharmaks) TimeLine server. (Sunil G via rohithsharmaks)
YARN-4204. ConcurrentModificationException in FairSchedulerQueueInfo. (adhoot)
Release 2.7.2 - UNRELEASED Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -70,7 +70,8 @@ public class FSLeafQueue extends FSQueue {
private Resource amResourceUsage; private Resource amResourceUsage;
private final ActiveUsersManager activeUsersManager; private final ActiveUsersManager activeUsersManager;
public static final List<FSQueue> EMPTY_LIST = Collections.emptyList();
public FSLeafQueue(String name, FairScheduler scheduler, public FSLeafQueue(String name, FairScheduler scheduler,
FSParentQueue parent) { FSParentQueue parent) {
super(name, scheduler, parent); super(name, scheduler, parent);
@ -383,7 +384,7 @@ public class FSLeafQueue extends FSQueue {
@Override @Override
public List<FSQueue> getChildQueues() { public List<FSQueue> getChildQueues() {
return new ArrayList<FSQueue>(1); return EMPTY_LIST;
} }
@Override @Override

View File

@ -27,6 +27,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.collect.ImmutableList;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -279,7 +280,7 @@ public class FSParentQueue extends FSQueue {
public List<FSQueue> getChildQueues() { public List<FSQueue> getChildQueues() {
readLock.lock(); readLock.lock();
try { try {
return Collections.unmodifiableList(childQueues); return ImmutableList.copyOf(childQueues);
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }

View File

@ -28,6 +28,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import javax.xml.parsers.ParserConfigurationException; import javax.xml.parsers.ParserConfigurationException;
import com.google.common.collect.ImmutableList;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -295,17 +296,18 @@ public class QueueManager {
* Remove a queue and all its descendents. * Remove a queue and all its descendents.
*/ */
private void removeQueue(FSQueue queue) { private void removeQueue(FSQueue queue) {
if (queue instanceof FSLeafQueue) { synchronized (queues) {
leafQueues.remove(queue); if (queue instanceof FSLeafQueue) {
} else { leafQueues.remove(queue);
List<FSQueue> childQueues = queue.getChildQueues(); } else {
while (!childQueues.isEmpty()) { for (FSQueue childQueue:queue.getChildQueues()) {
removeQueue(childQueues.get(0)); removeQueue(childQueue);
}
} }
queues.remove(queue.getName());
FSParentQueue parent = queue.getParent();
parent.removeChildQueue(queue);
} }
queues.remove(queue.getName());
FSParentQueue parent = queue.getParent();
parent.removeChildQueue(queue);
} }
/** /**
@ -360,7 +362,9 @@ public class QueueManager {
* Get a collection of all queues * Get a collection of all queues
*/ */
public Collection<FSQueue> getQueues() { public Collection<FSQueue> getQueues() {
return queues.values(); synchronized (queues) {
return ImmutableList.copyOf(queues.values());
}
} }
private String ensureRootPrefix(String name) { private String ensureRootPrefix(String name) {

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.Before;
import org.junit.Test;
import java.util.HashSet;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestFSParentQueue {
private FairSchedulerConfiguration conf;
private QueueManager queueManager;
private Set<FSQueue> notEmptyQueues;
@Before
public void setUp() throws Exception {
conf = new FairSchedulerConfiguration();
FairScheduler scheduler = mock(FairScheduler.class);
AllocationConfiguration allocConf = new AllocationConfiguration(conf);
when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
when(scheduler.getConf()).thenReturn(conf);
SystemClock clock = new SystemClock();
when(scheduler.getClock()).thenReturn(clock);
notEmptyQueues = new HashSet<FSQueue>();
queueManager = new QueueManager(scheduler) {
@Override
public boolean isEmpty(FSQueue queue) {
return !notEmptyQueues.contains(queue);
}
};
FSQueueMetrics.forQueue("root", null, true, conf);
queueManager.initialize(conf);
}
@Test
public void testConcurrentChangeToGetChildQueue() {
queueManager.getLeafQueue("parent.child", true);
queueManager.getLeafQueue("parent.child2", true);
FSParentQueue test = queueManager.getParentQueue("parent", false);
assertEquals(2, test.getChildQueues().size());
boolean first = true;
int childQueuesFound = 0;
for (FSQueue childQueue:test.getChildQueues()) {
if (first) {
first = false;
queueManager.getLeafQueue("parent.child3", true);
}
childQueuesFound++;
}
assertEquals(2, childQueuesFound);
assertEquals(3, test.getChildQueues().size());
}
}