YARN-1567. In Fair Scheduler, allow empty queues to change between leaf and parent on allocation file reload (Sandy Ryza)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1558229 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
511f5136f8
commit
04c7b8afb5
|
@ -205,6 +205,9 @@ Release 2.4.0 - UNRELEASED
|
||||||
dependencies and thus compact the dependency list for leaf modules.
|
dependencies and thus compact the dependency list for leaf modules.
|
||||||
(Alejandro Abdelnur via vinodkv)
|
(Alejandro Abdelnur via vinodkv)
|
||||||
|
|
||||||
|
YARN-1567. In Fair Scheduler, allow empty queues to change between leaf and
|
||||||
|
parent on allocation file reload (Sandy Ryza)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -509,4 +509,8 @@ public class QueueMetrics implements MetricsSource {
|
||||||
public int getActiveApps() {
|
public int getActiveApps() {
|
||||||
return activeApplications.value();
|
return activeApplications.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public MetricsSystem getMetricsSystem() {
|
||||||
|
return metricsSystem;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -76,7 +76,8 @@ public class AllocationConfiguration {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
QueuePlacementPolicy placementPolicy;
|
QueuePlacementPolicy placementPolicy;
|
||||||
|
|
||||||
private final Set<String> queueNames;
|
@VisibleForTesting
|
||||||
|
Set<String> queueNames;
|
||||||
|
|
||||||
public AllocationConfiguration(Map<String, Resource> minQueueResources,
|
public AllocationConfiguration(Map<String, Resource> minQueueResources,
|
||||||
Map<String, Resource> maxQueueResources,
|
Map<String, Resource> maxQueueResources,
|
||||||
|
|
|
@ -214,7 +214,7 @@ public class FSLeafQueue extends FSQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Collection<FSQueue> getChildQueues() {
|
public List<FSQueue> getChildQueues() {
|
||||||
return new ArrayList<FSQueue>(1);
|
return new ArrayList<FSQueue>(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -157,7 +157,7 @@ public class FSParentQueue extends FSQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Collection<FSQueue> getChildQueues() {
|
public List<FSQueue> getChildQueues() {
|
||||||
return childQueues;
|
return childQueues;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
@ -158,7 +159,7 @@ public abstract class FSQueue extends Schedulable implements Queue {
|
||||||
/**
|
/**
|
||||||
* Gets the children of this queue, if any.
|
* Gets the children of this queue, if any.
|
||||||
*/
|
*/
|
||||||
public abstract Collection<FSQueue> getChildQueues();
|
public abstract List<FSQueue> getChildQueues();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds all applications in the queue and its subqueues to the given collection.
|
* Adds all applications in the queue and its subqueues to the given collection.
|
||||||
|
|
|
@ -85,9 +85,7 @@ public class QueueManager {
|
||||||
* could be referred to as just "parent1.queue2".
|
* could be referred to as just "parent1.queue2".
|
||||||
*/
|
*/
|
||||||
public FSLeafQueue getLeafQueue(String name, boolean create) {
|
public FSLeafQueue getLeafQueue(String name, boolean create) {
|
||||||
if (!name.startsWith(ROOT_QUEUE + ".")) {
|
name = ensureRootPrefix(name);
|
||||||
name = ROOT_QUEUE + "." + name;
|
|
||||||
}
|
|
||||||
synchronized (queues) {
|
synchronized (queues) {
|
||||||
FSQueue queue = queues.get(name);
|
FSQueue queue = queues.get(name);
|
||||||
if (queue == null && create) {
|
if (queue == null && create) {
|
||||||
|
@ -174,13 +172,107 @@ public class QueueManager {
|
||||||
return leafQueue;
|
return leafQueue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Make way for the given leaf queue if possible, by removing incompatible
|
||||||
|
* queues with no apps in them. Incompatibility could be due to
|
||||||
|
* (1) leafToCreate being currently being a parent, or (2) an existing leaf queue in
|
||||||
|
* the ancestry of leafToCreate.
|
||||||
|
*
|
||||||
|
* We will never remove the root queue or the default queue in this way.
|
||||||
|
*
|
||||||
|
* @return true if we can create leafToCreate or it already exists.
|
||||||
|
*/
|
||||||
|
private boolean removeEmptyIncompatibleQueues(String leafToCreate) {
|
||||||
|
leafToCreate = ensureRootPrefix(leafToCreate);
|
||||||
|
|
||||||
|
// Ensure leafToCreate is not root and doesn't have the default queue in its
|
||||||
|
// ancestry.
|
||||||
|
if (leafToCreate.equals(ROOT_QUEUE) ||
|
||||||
|
leafToCreate.startsWith(
|
||||||
|
ROOT_QUEUE + "." + YarnConfiguration.DEFAULT_QUEUE_NAME + ".")) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
FSQueue queue = queues.get(leafToCreate);
|
||||||
|
// Queue exists already.
|
||||||
|
if (queue != null) {
|
||||||
|
if (queue instanceof FSLeafQueue) {
|
||||||
|
// If it's an already existing leaf, we're ok.
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
// If it's an existing parent queue, remove it if it's empty.
|
||||||
|
return removeQueueIfEmpty(queue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Queue doesn't exist already. Check if the new queue would be created
|
||||||
|
// under an existing leaf queue. If so, try removing that leaf queue.
|
||||||
|
int sepIndex = leafToCreate.length();
|
||||||
|
sepIndex = leafToCreate.lastIndexOf('.', sepIndex-1);
|
||||||
|
while (sepIndex != -1) {
|
||||||
|
String prefixString = leafToCreate.substring(0, sepIndex);
|
||||||
|
FSQueue prefixQueue = queues.get(prefixString);
|
||||||
|
if (prefixQueue != null && prefixQueue instanceof FSLeafQueue) {
|
||||||
|
return removeQueueIfEmpty(prefixQueue);
|
||||||
|
}
|
||||||
|
sepIndex = leafToCreate.lastIndexOf('.', sepIndex-1);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove the queue if it and its descendents are all empty.
|
||||||
|
* @param queue
|
||||||
|
* @return true if removed, false otherwise
|
||||||
|
*/
|
||||||
|
private boolean removeQueueIfEmpty(FSQueue queue) {
|
||||||
|
if (isEmpty(queue)) {
|
||||||
|
removeQueue(queue);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove a queue and all its descendents.
|
||||||
|
*/
|
||||||
|
private void removeQueue(FSQueue queue) {
|
||||||
|
if (queue instanceof FSLeafQueue) {
|
||||||
|
leafQueues.remove(queue);
|
||||||
|
} else {
|
||||||
|
List<FSQueue> childQueues = queue.getChildQueues();
|
||||||
|
while (!childQueues.isEmpty()) {
|
||||||
|
removeQueue(childQueues.get(0));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
queues.remove(queue.getName());
|
||||||
|
queue.getParent().getChildQueues().remove(queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns true if there are no applications, running or not, in the given
|
||||||
|
* queue or any of its descendents.
|
||||||
|
*/
|
||||||
|
protected boolean isEmpty(FSQueue queue) {
|
||||||
|
if (queue instanceof FSLeafQueue) {
|
||||||
|
FSLeafQueue leafQueue = (FSLeafQueue)queue;
|
||||||
|
return queue.getNumRunnableApps() == 0 &&
|
||||||
|
leafQueue.getNonRunnableAppSchedulables().isEmpty();
|
||||||
|
} else {
|
||||||
|
for (FSQueue child : queue.getChildQueues()) {
|
||||||
|
if (!isEmpty(child)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets a queue by name.
|
* Gets a queue by name.
|
||||||
*/
|
*/
|
||||||
public FSQueue getQueue(String name) {
|
public FSQueue getQueue(String name) {
|
||||||
if (!name.startsWith(ROOT_QUEUE + ".") && !name.equals(ROOT_QUEUE)) {
|
name = ensureRootPrefix(name);
|
||||||
name = ROOT_QUEUE + "." + name;
|
|
||||||
}
|
|
||||||
synchronized (queues) {
|
synchronized (queues) {
|
||||||
return queues.get(name);
|
return queues.get(name);
|
||||||
}
|
}
|
||||||
|
@ -190,9 +282,7 @@ public class QueueManager {
|
||||||
* Return whether a queue exists already.
|
* Return whether a queue exists already.
|
||||||
*/
|
*/
|
||||||
public boolean exists(String name) {
|
public boolean exists(String name) {
|
||||||
if (!name.startsWith(ROOT_QUEUE + ".") && !name.equals(ROOT_QUEUE)) {
|
name = ensureRootPrefix(name);
|
||||||
name = ROOT_QUEUE + "." + name;
|
|
||||||
}
|
|
||||||
synchronized (queues) {
|
synchronized (queues) {
|
||||||
return queues.containsKey(name);
|
return queues.containsKey(name);
|
||||||
}
|
}
|
||||||
|
@ -214,10 +304,19 @@ public class QueueManager {
|
||||||
return queues.values();
|
return queues.values();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String ensureRootPrefix(String name) {
|
||||||
|
if (!name.startsWith(ROOT_QUEUE + ".") && !name.equals(ROOT_QUEUE)) {
|
||||||
|
name = ROOT_QUEUE + "." + name;
|
||||||
|
}
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
|
||||||
public void updateAllocationConfiguration(AllocationConfiguration queueConf) {
|
public void updateAllocationConfiguration(AllocationConfiguration queueConf) {
|
||||||
// Make sure all queues exist
|
// Make sure all queues exist
|
||||||
for (String name : queueConf.getQueueNames()) {
|
for (String name : queueConf.getQueueNames()) {
|
||||||
getLeafQueue(name, true);
|
if (removeEmptyIncompatibleQueues(name)) {
|
||||||
|
getLeafQueue(name, true);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (FSQueue queue : queues.values()) {
|
for (FSQueue queue : queues.values()) {
|
||||||
|
|
|
@ -0,0 +1,101 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.util.SystemClock;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import com.google.common.collect.Sets;
|
||||||
|
|
||||||
|
public class TestQueueManager {
|
||||||
|
private FairSchedulerConfiguration conf;
|
||||||
|
private QueueManager queueManager;
|
||||||
|
private Set<FSQueue> notEmptyQueues;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
conf = new FairSchedulerConfiguration();
|
||||||
|
FairScheduler scheduler = mock(FairScheduler.class);
|
||||||
|
AllocationConfiguration allocConf = new AllocationConfiguration(conf);
|
||||||
|
when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
|
||||||
|
when(scheduler.getConf()).thenReturn(conf);
|
||||||
|
SystemClock clock = new SystemClock();
|
||||||
|
when(scheduler.getClock()).thenReturn(clock);
|
||||||
|
notEmptyQueues = new HashSet<FSQueue>();
|
||||||
|
queueManager = new QueueManager(scheduler) {
|
||||||
|
@Override
|
||||||
|
public boolean isEmpty(FSQueue queue) {
|
||||||
|
return !notEmptyQueues.contains(queue);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
FSQueueMetrics.forQueue("root", null, true, conf);
|
||||||
|
queueManager.initialize(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReloadTurnsLeafQueueIntoParent() throws Exception {
|
||||||
|
updateConfiguredQueues(queueManager, "queue1");
|
||||||
|
|
||||||
|
// When no apps are running in the leaf queue, should be fine turning it
|
||||||
|
// into a parent.
|
||||||
|
updateConfiguredQueues(queueManager, "queue1.queue2");
|
||||||
|
assertNull(queueManager.getLeafQueue("queue1", false));
|
||||||
|
assertNotNull(queueManager.getLeafQueue("queue1.queue2", false));
|
||||||
|
|
||||||
|
// When leaf queues are empty, should be ok deleting them and
|
||||||
|
// turning parent into a leaf.
|
||||||
|
updateConfiguredQueues(queueManager, "queue1");
|
||||||
|
assertNull(queueManager.getLeafQueue("queue1.queue2", false));
|
||||||
|
assertNotNull(queueManager.getLeafQueue("queue1", false));
|
||||||
|
|
||||||
|
// When apps exist in leaf queue, we shouldn't be able to create
|
||||||
|
// children under it, but things should work otherwise.
|
||||||
|
notEmptyQueues.add(queueManager.getLeafQueue("queue1", false));
|
||||||
|
updateConfiguredQueues(queueManager, "queue1.queue2");
|
||||||
|
assertNull(queueManager.getLeafQueue("queue1.queue2", false));
|
||||||
|
assertNotNull(queueManager.getLeafQueue("queue1", false));
|
||||||
|
|
||||||
|
// When apps exist in leaf queues under a parent queue, shouldn't be
|
||||||
|
// able to turn it into a leaf queue, but things should work otherwise.
|
||||||
|
notEmptyQueues.clear();
|
||||||
|
updateConfiguredQueues(queueManager, "queue1.queue2");
|
||||||
|
notEmptyQueues.add(queueManager.getQueue("root.queue1"));
|
||||||
|
updateConfiguredQueues(queueManager, "queue1");
|
||||||
|
assertNotNull(queueManager.getLeafQueue("queue1.queue2", false));
|
||||||
|
assertNull(queueManager.getLeafQueue("queue1", false));
|
||||||
|
|
||||||
|
// Should never to be able to create a queue under the default queue
|
||||||
|
updateConfiguredQueues(queueManager, "default.queue3");
|
||||||
|
assertNull(queueManager.getLeafQueue("default.queue3", false));
|
||||||
|
assertNotNull(queueManager.getLeafQueue("default", false));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateConfiguredQueues(QueueManager queueMgr, String... confQueues) {
|
||||||
|
AllocationConfiguration allocConf = new AllocationConfiguration(conf);
|
||||||
|
allocConf.queueNames = Sets.newHashSet(confQueues);
|
||||||
|
queueMgr.updateAllocationConfiguration(allocConf);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue