From 6699198b54bf6360c164a6ce7552c8b91a318c59 Mon Sep 17 00:00:00 2001 From: Peter Bacsko Date: Thu, 4 Mar 2021 17:18:35 +0100 Subject: [PATCH] YARN-10532. Capacity Scheduler Auto Queue Creation: Allow auto delete queue when queue is not being used. Contributed by Qi Zhu. --- .../scheduler/capacity/AbstractCSQueue.java | 47 +++ .../AutoCreatedQueueDeletionPolicy.java | 163 ++++++++++ .../scheduler/capacity/CapacityScheduler.java | 64 +++- .../CapacitySchedulerConfiguration.java | 56 ++++ .../scheduler/capacity/LeafQueue.java | 10 + .../scheduler/capacity/ParentQueue.java | 34 +- .../event/AutoCreatedQueueDeletionEvent.java | 32 ++ .../scheduler/event/SchedulerEventType.java | 5 +- .../monitor/TestSchedulingMonitor.java | 43 +++ .../TestAutoCreatedQueueDeletionPolicy.java | 184 +++++++++++ ...CapacitySchedulerNewQueueAutoCreation.java | 303 +++++++++++++++++- 11 files changed, 931 insertions(+), 10 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueDeletionPolicy.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AutoCreatedQueueDeletionEvent.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAutoCreatedQueueDeletionPolicy.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 28e2d54c494..e5380fa9528 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -154,6 +155,10 @@ public abstract class AbstractCSQueue implements CSQueue { // is it a dynamic queue? private boolean dynamicQueue = false; + // The timestamp of the last submitted application to this queue. + // Only applies to dynamic queues. + private long lastSubmittedTimestamp; + public AbstractCSQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { this(cs, cs.getConfiguration(), queueName, parent, old); @@ -1642,4 +1647,46 @@ public abstract class AbstractCSQueue implements CSQueue { return "capacity=" + queueCapacities.getCapacity(); } } + + public boolean isEligibleForAutoDeletion() { + return false; + } + + public boolean isInactiveDynamicQueue() { + long idleDurationSeconds = + (Time.monotonicNow() - getLastSubmittedTimestamp())/1000; + return isDynamicQueue() && isEligibleForAutoDeletion() && + (idleDurationSeconds > this.csContext.getConfiguration(). + getAutoExpiredDeletionTime()); + } + + public void updateLastSubmittedTimeStamp() { + writeLock.lock(); + try { + this.lastSubmittedTimestamp = Time.monotonicNow(); + } finally { + writeLock.unlock(); + } + } + + public long getLastSubmittedTimestamp() { + readLock.lock(); + + try { + return lastSubmittedTimestamp; + } finally { + readLock.unlock(); + } + } + + @VisibleForTesting + public void setLastSubmittedTimestamp(long lastSubmittedTimestamp) { + writeLock.lock(); + try { + this.lastSubmittedTimestamp = lastSubmittedTimestamp; + } finally { + writeLock.unlock(); + } + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueDeletionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueDeletionPolicy.java new file mode 100644 index 00000000000..4b47bb47164 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueDeletionPolicy.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AutoCreatedQueueDeletionEvent; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Auto deletion policy for auto created queue V2. + * Just for weight based auto created queues. + */ +public class AutoCreatedQueueDeletionPolicy implements SchedulingEditPolicy { + private static final Logger LOG = + LoggerFactory.getLogger(AutoCreatedQueueDeletionPolicy.class); + + private Clock clock; + + // Pointer to other RM components + private RMContext rmContext; + private ResourceCalculator rc; + private CapacityScheduler scheduler; + + private long monitoringInterval; + + // markedForDeletion: in each interval, + // this set is extended by queues that are eligible for auto deletion. + private Set markedForDeletion = new HashSet<>(); + // sentForDeletion: if in the next interval, + // there is queue, that is eligible for auto deletion, + // and is already marked for deletion, move it to this queue. + private Set sentForDeletion = new HashSet<>(); + + @Override + public void init(final Configuration config, final RMContext context, + final ResourceScheduler sched) { + LOG.info("Auto Deletion Policy monitor: {}" + this. + getClass().getCanonicalName()); + if (!(sched instanceof CapacityScheduler)) { + throw new YarnRuntimeException("Class " + + sched.getClass().getCanonicalName() + " not instance of " + + CapacityScheduler.class.getCanonicalName()); + } + rmContext = context; + scheduler = (CapacityScheduler) sched; + clock = scheduler.getClock(); + + rc = scheduler.getResourceCalculator(); + + CapacitySchedulerConfiguration csConfig = scheduler.getConfiguration(); + + // The monitor time will equal the + // auto deletion expired time default. + monitoringInterval = + csConfig.getLong(CapacitySchedulerConfiguration. + AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME, + CapacitySchedulerConfiguration. + DEFAULT_AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME) * 1000; + + prepareForAutoDeletion(); + } + + public void prepareForAutoDeletion() { + Set newMarks = new HashSet<>(); + for (Map.Entry queueEntry : + scheduler.getCapacitySchedulerQueueManager().getQueues().entrySet()) { + String queuePath = queueEntry.getKey(); + CSQueue queue = queueEntry.getValue(); + if (queue instanceof AbstractCSQueue && + ((AbstractCSQueue) queue).isEligibleForAutoDeletion()) { + if (markedForDeletion.contains(queuePath)) { + sentForDeletion.add(queuePath); + markedForDeletion.remove(queuePath); + } else { + newMarks.add(queuePath); + } + } + } + markedForDeletion.clear(); + markedForDeletion.addAll(newMarks); + } + + @Override + public void editSchedule() { + long startTs = clock.getTime(); + + prepareForAutoDeletion(); + triggerAutoDeletionForExpiredQueues(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Total time used=" + (clock.getTime() - startTs) + " ms."); + } + } + + public void triggerAutoDeletionForExpiredQueues() { + // Proceed new auto created queues + for (String queueName : sentForDeletion) { + CSQueue checkQueue = + scheduler.getCapacitySchedulerQueueManager(). + getQueue(queueName); + deleteAutoCreatedQueue(checkQueue); + } + sentForDeletion.clear(); + } + + private void deleteAutoCreatedQueue(CSQueue queue) { + if (queue != null) { + AutoCreatedQueueDeletionEvent autoCreatedQueueDeletionEvent = + new AutoCreatedQueueDeletionEvent(queue); + LOG.info("Queue:" + queue.getQueuePath() + + " will trigger deletion event to CS."); + scheduler.getRMContext().getDispatcher().getEventHandler().handle( + autoCreatedQueueDeletionEvent); + } + } + + @Override + public long getMonitoringInterval() { + return monitoringInterval; + } + + @Override + public String getPolicyName() { + return AutoCreatedQueueDeletionPolicy.class.getCanonicalName(); + } + + @VisibleForTesting + public Set getMarkedForDeletion() { + return markedForDeletion; + } + + @VisibleForTesting + public Set getSentForDeletion() { + return sentForDeletion; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index ee91b0c3825..467dacbfbb4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -143,9 +143,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsU import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; - import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event .QueueManagementChangeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AutoCreatedQueueDeletionEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; @@ -2106,11 +2106,35 @@ public class CapacityScheduler extends } } break; + case AUTO_QUEUE_DELETION: + try { + AutoCreatedQueueDeletionEvent autoCreatedQueueDeletionEvent = + (AutoCreatedQueueDeletionEvent) event; + removeAutoCreatedQueue(autoCreatedQueueDeletionEvent. + getCheckQueue()); + } catch (SchedulerDynamicEditException sde) { + LOG.error("Dynamic queue deletion cannot be applied for " + + "queue : ", sde); + } + break; default: LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); } } + private void removeAutoCreatedQueue(CSQueue checkQueue) + throws SchedulerDynamicEditException{ + writeLock.lock(); + try { + if (checkQueue instanceof AbstractCSQueue + && ((AbstractCSQueue) checkQueue).isInactiveDynamicQueue()) { + removeQueue(checkQueue); + } + } finally { + writeLock.unlock(); + } + } + private void updateNodeAttributes( NodeAttributesUpdateSchedulerEvent attributeUpdateEvent) { writeLock.lock(); @@ -2564,6 +2588,44 @@ public class CapacityScheduler extends } } + public void removeQueue(CSQueue queue) + throws SchedulerDynamicEditException { + writeLock.lock(); + try { + LOG.info("Removing queue: " + queue.getQueuePath()); + if (!((AbstractCSQueue)queue).isDynamicQueue()) { + throw new SchedulerDynamicEditException( + "The queue that we are asked " + + "to remove (" + queue.getQueuePath() + + ") is not a DynamicQueue"); + } + + if (!((AbstractCSQueue) queue).isEligibleForAutoDeletion()) { + LOG.warn("Queue " + queue.getQueuePath() + + " is marked for deletion, but not eligible for deletion"); + return; + } + + ParentQueue parentQueue = (ParentQueue)queue.getParent(); + if (parentQueue != null) { + ((ParentQueue) queue.getParent()).removeChildQueue(queue); + } else { + throw new SchedulerDynamicEditException( + "The queue " + queue.getQueuePath() + + " can't be removed because it's parent is null"); + } + + if (parentQueue.childQueues.contains(queue) || + queueManager.getQueue(queue.getQueuePath()) != null) { + throw new SchedulerDynamicEditException( + "The queue " + queue.getQueuePath() + + " has not been removed normally."); + } + } finally { + writeLock.unlock(); + } + } + @Override public void addQueue(Queue queue) throws SchedulerDynamicEditException, IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index b66ab85733e..90979dc94dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -2200,6 +2200,62 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur public static final long DEFAULT_QUEUE_MANAGEMENT_MONITORING_INTERVAL = 1500L; + @Private + public static final boolean + DEFAULT_AUTO_CREATE_CHILD_QUEUE_AUTO_REMOVAL_ENABLE = true; + + @Private + public static final String AUTO_CREATE_CHILD_QUEUE_AUTO_REMOVAL_ENABLE = + AUTO_QUEUE_CREATION_V2_PREFIX + "queue-auto-removal.enable"; + + // 300s for expired default + @Private + public static final long + DEFAULT_AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME = 300; + + @Private + public static final String AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME = + PREFIX + AUTO_QUEUE_CREATION_V2_PREFIX + "queue-expiration-time"; + + /** + * If true, auto created queue with weight mode + * will be deleted when queue is expired. + * @param queuePath the queue's path for auto deletion check + * @return true if auto created queue's deletion when expired is enabled + * else false. Default + * is true. + */ + @Private + public boolean isAutoExpiredDeletionEnabled(String queuePath) { + boolean isAutoExpiredDeletionEnabled = getBoolean( + getQueuePrefix(queuePath) + + AUTO_CREATE_CHILD_QUEUE_AUTO_REMOVAL_ENABLE, + DEFAULT_AUTO_CREATE_CHILD_QUEUE_AUTO_REMOVAL_ENABLE); + return isAutoExpiredDeletionEnabled; + } + + @Private + @VisibleForTesting + public void setAutoExpiredDeletionEnabled(String queuePath, + boolean autoRemovalEnable) { + setBoolean(getQueuePrefix(queuePath) + + AUTO_CREATE_CHILD_QUEUE_AUTO_REMOVAL_ENABLE, + autoRemovalEnable); + } + + @Private + @VisibleForTesting + public void setAutoExpiredDeletionTime(long time) { + setLong(AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME, time); + } + + @Private + @VisibleForTesting + public long getAutoExpiredDeletionTime() { + return getLong(AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME, + DEFAULT_AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME); + } + /** * Time in milliseconds between invocations * of QueueConfigurationAutoRefreshPolicy. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 104a89caee0..71e65cb4367 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -614,6 +614,9 @@ public class LeafQueue extends AbstractCSQueue { // Careful! Locking order is important! validateSubmitApplication(applicationId, userName, queue); + // Signal for expired auto deletion. + updateLastSubmittedTimeStamp(); + // Inform the parent queue try { getParent().submitApplication(applicationId, userName, queue); @@ -2402,4 +2405,11 @@ public class LeafQueue extends AbstractCSQueue { } return appsToReturn; } + + @Override + public boolean isEligibleForAutoDeletion() { + return isDynamicQueue() && getNumApplications() == 0 + && csContext.getConfiguration(). + isAutoExpiredDeletionEnabled(this.getQueuePath()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index ce5e49040e6..3d289331413 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -570,9 +570,10 @@ public class ParentQueue extends AbstractCSQueue { CSQueue newQueue = createNewQueue(childQueuePath, isLeaf); this.childQueues.add(newQueue); + updateLastSubmittedTimeStamp(); - // Call updateClusterResource - // , which will deal with all effectiveMin/MaxResource + // Call updateClusterResource. + // Which will deal with all effectiveMin/MaxResource // Calculation this.updateClusterResource(csContext.getClusterResource(), new ResourceLimits(this.csContext.getClusterResource())); @@ -583,6 +584,28 @@ public class ParentQueue extends AbstractCSQueue { } } + + // New method to remove child queue + public void removeChildQueue(CSQueue queue) + throws SchedulerDynamicEditException { + writeLock.lock(); + try { + // Now we can do remove and update + this.childQueues.remove(queue); + this.scheduler.getCapacitySchedulerQueueManager() + .removeQueue(queue.getQueuePath()); + + // Call updateClusterResource, + // which will deal with all effectiveMin/MaxResource + // Calculation + this.updateClusterResource(csContext.getClusterResource(), + new ResourceLimits(this.csContext.getClusterResource())); + + } finally { + writeLock.unlock(); + } + } + /** * Check whether this queue supports adding additional child queues * dynamically. @@ -1607,4 +1630,11 @@ public class ParentQueue extends AbstractCSQueue { Map getEffectiveMinRatioPerResource() { return effectiveMinRatioPerResource; } + + @Override + public boolean isEligibleForAutoDeletion() { + return isDynamicQueue() && getChildQueues().size() == 0 && + csContext.getConfiguration(). + isAutoExpiredDeletionEnabled(this.getQueuePath()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AutoCreatedQueueDeletionEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AutoCreatedQueueDeletionEvent.java new file mode 100644 index 00000000000..68b86dda408 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AutoCreatedQueueDeletionEvent.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; + +public class AutoCreatedQueueDeletionEvent extends SchedulerEvent{ + private CSQueue checkQueue; + public AutoCreatedQueueDeletionEvent(CSQueue checkQueue) { + super(SchedulerEventType.AUTO_QUEUE_DELETION); + this.checkQueue = checkQueue; + } + + public CSQueue getCheckQueue() { + return checkQueue; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java index 869bf0ed9e4..3b8a1de64e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java @@ -55,5 +55,8 @@ public enum SchedulerEventType { MARK_CONTAINER_FOR_NONKILLABLE, //Queue Management Change - MANAGE_QUEUE + MANAGE_QUEUE, + + // Auto created queue, auto deletion check + AUTO_QUEUE_DELETION } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/TestSchedulingMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/TestSchedulingMonitor.java index 84126c72877..f04081e4860 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/TestSchedulingMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/TestSchedulingMonitor.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.Proportion import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedQueueDeletionPolicy; import org.junit.Test; import java.util.HashSet; @@ -91,5 +92,47 @@ public class TestSchedulingMonitor { YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS); cs.reinitialize(conf, rm.getRMContext()); assertTrue(smm.isRSMEmpty()); + rm.close(); + } + + @Test(timeout = 10000) + public void testRMUpdateAutoCreatedQueueDeletionPolicy() throws Exception { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); + conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, + AutoCreatedQueueDeletionPolicy.class.getCanonicalName()); + MockRM rm = new MockRM(conf); + rm.start(); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + SchedulingMonitorManager smm = cs.getSchedulingMonitorManager(); + + // runningSchedulingMonitors should not be empty when initialize RM + // scheduler monitor + cs.reinitialize(conf, rm.getRMContext()); + assertFalse(smm.isRSMEmpty()); + + // make sure runningSchedulingPolicies contains all the configured policy + // in YARNConfiguration + String[] configuredPolicies = conf.getStrings( + YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES); + Set configurePoliciesSet = new HashSet<>(); + for (String s : configuredPolicies) { + configurePoliciesSet.add(s); + } + assertTrue(smm.isSameConfiguredPolicies(configurePoliciesSet)); + + // make sure the running monitor contains AutoCreatedQueueDeletionPolicy + assertTrue(configurePoliciesSet. + contains(AutoCreatedQueueDeletionPolicy.class.getCanonicalName())); + + // disable RM scheduler monitor + conf.setBoolean( + YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS); + cs.reinitialize(conf, rm.getRMContext()); + assertTrue(smm.isRSMEmpty()); + rm.close(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAutoCreatedQueueDeletionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAutoCreatedQueueDeletionPolicy.java new file mode 100644 index 00000000000..5359178d3aa --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAutoCreatedQueueDeletionPolicy.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; +import org.junit.Assert; +import org.junit.Test; + +public class TestAutoCreatedQueueDeletionPolicy + extends TestCapacitySchedulerNewQueueAutoCreation { + private CapacityScheduler cs; + private AutoCreatedQueueDeletionPolicy policy; + + public void prepareForSchedule() throws Exception{ + super.startScheduler(); + + policy = getPolicy(); + cs = getCs(); + + policy.editSchedule(); + // There are no queues should be scheduled + Assert.assertEquals(policy.getMarkedForDeletion().size(), 0); + Assert.assertEquals(policy.getSentForDeletion().size(), 0); + + createQueue("root.e.e1"); + } + + @Test + public void testEditSchedule() throws Exception { + prepareForSchedule(); + // Make sure e not null + AbstractCSQueue e = (AbstractCSQueue) cs. + getQueue("root.e"); + Assert.assertNotNull(e); + Assert.assertTrue(e.isDynamicQueue()); + + // Make sure e1 not null + AbstractCSQueue e1 = (AbstractCSQueue)cs. + getQueue("root.e.e1"); + Assert.assertNotNull(e1); + Assert.assertTrue(e1.isDynamicQueue()); + // signal it because of without submit created + e1.setLastSubmittedTimestamp(Time.monotonicNow()); + + ApplicationAttemptId user0AppAttemptId = + submitApp(cs, USER0, USER0, "root.e"); + + // Wait user0 created successfully. + GenericTestUtils.waitFor(()-> cs.getQueue( + "root.e.user_0") != null, 100, + 2000); + // Make sure user0 not null + AbstractCSQueue user0 = (AbstractCSQueue) cs + .getQueue("root.e.user_0"); + Assert.assertNotNull(user0); + Assert.assertTrue(user0.isDynamicQueue()); + // Make app finished + AppAttemptRemovedSchedulerEvent event = + new AppAttemptRemovedSchedulerEvent(user0AppAttemptId, + RMAppAttemptState.FINISHED, false); + cs.handle(event); + AppRemovedSchedulerEvent rEvent = new AppRemovedSchedulerEvent( + user0AppAttemptId.getApplicationId(), RMAppState.FINISHED); + cs.handle(rEvent); + + // There are no apps in user0 + Assert.assertEquals(user0.getNumApplications(), 0); + + // Wait the time expired. + long l1 = user0.getLastSubmittedTimestamp(); + GenericTestUtils.waitFor(() -> { + long duration = (Time.monotonicNow() - l1)/1000; + return duration > getCs(). + getConfiguration().getAutoExpiredDeletionTime(); + }, 100, 2000); + + long l2 = e1.getLastSubmittedTimestamp(); + GenericTestUtils.waitFor(() -> { + long duration = (Time.monotonicNow() - l2)/1000; + return duration > getCs(). + getConfiguration().getAutoExpiredDeletionTime(); + }, 100, 2000); + + policy.editSchedule(); + // Make sure user_0 , e1 queue + // will be scheduled to mark for deletion + // because it is expired for deletion. + Assert.assertEquals(policy.getMarkedForDeletion().size(), 2); + Assert.assertTrue(policy. + getMarkedForDeletion().contains("root.e.user_0")); + Assert.assertTrue(policy. + getMarkedForDeletion().contains("root.e.e1")); + // Make sure the send for deletion is empty for first mark. + Assert.assertEquals(policy.getSentForDeletion().size(), 0); + + // Make sure user_0 , e1 queue will be scheduled to send for deletion + policy.prepareForAutoDeletion(); + Assert.assertEquals(policy.getMarkedForDeletion().size(), 0); + Assert.assertEquals(policy.getSentForDeletion().size(), 2); + + // Make sure e1, user0 not null before trigger remove. + e1 = (AbstractCSQueue) cs.getQueue("root.e.e1"); + Assert.assertNotNull(e1); + user0 = (AbstractCSQueue)cs.getQueue("root.e.user_0"); + Assert.assertNotNull(user0); + + // Make sure e1, user0 will be null after trigger remove. + policy.triggerAutoDeletionForExpiredQueues(); + Assert.assertEquals(policy.getMarkedForDeletion().size(), 0); + Assert.assertEquals(policy.getSentForDeletion().size(), 0); + + // Wait e1, user0 auto deleted. + GenericTestUtils.waitFor(()-> cs.getQueue( + "root.e.e1") == null, + 100, 2000); + GenericTestUtils.waitFor(()-> cs.getQueue( + "root.e.user_0") == null, + 100, 2000); + e1 = (AbstractCSQueue) cs.getQueue("root.e.e1"); + Assert.assertNull(e1); + user0 = (AbstractCSQueue)cs.getQueue("root.e.user_0"); + Assert.assertNull(user0); + + // Make sure e is not null, before schedule. + e = (AbstractCSQueue) cs.getQueue("root.e"); + Assert.assertNotNull(e); + + // Expired for e + // Wait e marked for deletion. + long l3 = e.getLastSubmittedTimestamp(); + GenericTestUtils.waitFor(() -> { + long duration = (Time.monotonicNow() - l3)/1000; + return duration > getCs(). + getConfiguration().getAutoExpiredDeletionTime(); + }, 100, 2000); + policy.editSchedule(); + e = (AbstractCSQueue) cs.getQueue("root.e"); + Assert.assertNotNull(e); + Assert.assertEquals(policy.getMarkedForDeletion().size(), 1); + Assert.assertEquals(policy.getSentForDeletion().size(), 0); + Assert.assertTrue(policy.getMarkedForDeletion().contains("root.e")); + + // Make sure e queue will be scheduled to send for deletion + policy.prepareForAutoDeletion(); + Assert.assertEquals(policy.getMarkedForDeletion().size(), 0); + Assert.assertEquals(policy.getSentForDeletion().size(), 1); + + // Make sure e not null before trigger remove. + e = (AbstractCSQueue) cs.getQueue("root.e"); + Assert.assertNotNull(e); + + // Make sure e will be null after trigger remove. + policy.triggerAutoDeletionForExpiredQueues(); + // Wait e1 auto deleted. + GenericTestUtils.waitFor(()-> cs.getQueue( + "root.e") == null, 100, 2000); + Assert.assertEquals(policy.getMarkedForDeletion().size(), 0); + Assert.assertEquals(policy.getSentForDeletion().size(), 0); + e = (AbstractCSQueue) cs.getQueue("root.e"); + Assert.assertNull(e); + } +} + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java index 2f83f1f0504..45c411f81c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -26,12 +28,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; -import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Assert; import org.junit.Before; @@ -39,6 +44,9 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Set; +import java.util.HashSet; + public class TestCapacitySchedulerNewQueueAutoCreation extends TestCapacitySchedulerAutoCreatedQueueBase { private static final Logger LOG = LoggerFactory.getLogger( @@ -50,6 +58,16 @@ public class TestCapacitySchedulerNewQueueAutoCreation private CapacityScheduler cs; private CapacitySchedulerConfiguration csConf; private CapacitySchedulerAutoQueueHandler autoQueueHandler; + private AutoCreatedQueueDeletionPolicy policy = new + AutoCreatedQueueDeletionPolicy(); + + public CapacityScheduler getCs() { + return cs; + } + + public AutoCreatedQueueDeletionPolicy getPolicy() { + return policy; + } /* Create the following structure: @@ -75,9 +93,12 @@ public class TestCapacitySchedulerNewQueueAutoCreation csConf.setAutoQueueCreationV2Enabled("root", true); csConf.setAutoQueueCreationV2Enabled("root.a", true); csConf.setAutoQueueCreationV2Enabled("root.e", true); + csConf.setAutoQueueCreationV2Enabled(PARENT_QUEUE, true); + // Test for auto deletion when expired + csConf.setAutoExpiredDeletionTime(1); } - private void startScheduler() throws Exception { + protected void startScheduler() throws Exception { RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); mgr.init(csConf); mockRM = new MockRM(csConf) { @@ -87,6 +108,8 @@ public class TestCapacitySchedulerNewQueueAutoCreation }; cs = (CapacityScheduler) mockRM.getResourceScheduler(); cs.updatePlacementRules(); + // Policy for new auto created queue's auto deletion when expired + policy.init(cs.getConfiguration(), cs.getRMContext(), cs); mockRM.start(); cs.start(); autoQueueHandler = new CapacitySchedulerAutoQueueHandler( @@ -506,7 +529,7 @@ public class TestCapacitySchedulerNewQueueAutoCreation Assert.assertTrue(user0.isDynamicQueue()); Assert.assertTrue(user0 instanceof LeafQueue); - LeafQueue user0LeafQueue = (LeafQueue)user0; + LeafQueue user0LeafQueue = (LeafQueue) user0; // Assert user limit factor is -1 Assert.assertTrue(user0LeafQueue.getUserLimitFactor() == -1); @@ -517,10 +540,11 @@ public class TestCapacitySchedulerNewQueueAutoCreation // Assert AM Resource Assert.assertEquals(user0LeafQueue.getAMResourceLimit().getMemorySize(), - user0LeafQueue.getMaxAMResourcePerQueuePercent()*MAX_MEMORY*GB, 1e-6); + user0LeafQueue. + getMaxAMResourcePerQueuePercent() * MAX_MEMORY * GB, 1e-6); // Assert user limit (no limit) when limit factor is -1 - Assert.assertEquals(MAX_MEMORY*GB, + Assert.assertEquals(MAX_MEMORY * GB, user0LeafQueue.getEffectiveMaxCapacityDown("", user0LeafQueue.getMinimumAllocation()).getMemorySize(), 1e-6); } @@ -585,7 +609,274 @@ public class TestCapacitySchedulerNewQueueAutoCreation } - private LeafQueue createQueue(String queuePath) throws YarnException { + @Test + public void testCapacitySchedulerAutoQueueDeletion() throws Exception { + startScheduler(); + csConf.setBoolean( + YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); + csConf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, + AutoCreatedQueueDeletionPolicy.class.getCanonicalName()); + csConf.setAutoExpiredDeletionTime(1); + cs.reinitialize(csConf, mockRM.getRMContext()); + + Set policies = new HashSet<>(); + policies.add( + AutoCreatedQueueDeletionPolicy.class.getCanonicalName()); + + Assert.assertTrue( + "No AutoCreatedQueueDeletionPolicy " + + "is present in running monitors", + cs.getSchedulingMonitorManager(). + isSameConfiguredPolicies(policies)); + + ApplicationAttemptId a2App = submitApp(cs, USER0, + "a2-auto", "root.a.a1-auto"); + + // Wait a2 created successfully. + GenericTestUtils.waitFor(()-> cs.getQueue( + "root.a.a1-auto.a2-auto") != null, + 100, 2000); + + AbstractCSQueue a1 = (AbstractCSQueue) cs.getQueue( + "root.a.a1-auto"); + Assert.assertNotNull("a1 is not present", a1); + AbstractCSQueue a2 = (AbstractCSQueue) cs.getQueue( + "root.a.a1-auto.a2-auto"); + Assert.assertNotNull("a2 is not present", a2); + Assert.assertTrue("a2 is not a dynamic queue", + a2.isDynamicQueue()); + + // Now there are still 1 app in a2 queue. + Assert.assertEquals(1, a2.getNumApplications()); + + // Wait the time expired. + long l1 = a2.getLastSubmittedTimestamp(); + GenericTestUtils.waitFor(() -> { + long duration = (Time.monotonicNow() - l1)/1000; + return duration > csConf.getAutoExpiredDeletionTime(); + }, 100, 2000); + + // Make sure the queue will not be deleted + // when expired with remaining apps. + a2 = (AbstractCSQueue) cs.getQueue( + "root.a.a1-auto.a2-auto"); + Assert.assertNotNull("a2 is not present", a2); + + // Make app finished. + AppAttemptRemovedSchedulerEvent event = + new AppAttemptRemovedSchedulerEvent(a2App, + RMAppAttemptState.FINISHED, false); + cs.handle(event); + AppRemovedSchedulerEvent rEvent = new AppRemovedSchedulerEvent( + a2App.getApplicationId(), RMAppState.FINISHED); + cs.handle(rEvent); + + // Now there are no apps in a2 queue. + Assert.assertEquals(0, a2.getNumApplications()); + + // Wait the a2 deleted. + GenericTestUtils.waitFor(() -> { + AbstractCSQueue a2Tmp = (AbstractCSQueue) cs.getQueue( + "root.a.a1-auto.a2-auto"); + return a2Tmp == null; + }, 100, 3000); + + a2 = (AbstractCSQueue) cs.getQueue( + "root.a.a1-auto.a2-auto"); + Assert.assertNull("a2 is not deleted", a2); + + // The parent will not be deleted with child queues + a1 = (AbstractCSQueue) cs.getQueue( + "root.a.a1-auto"); + Assert.assertNotNull("a1 is not present", a1); + + // Now the parent queue without child + // will be deleted for expired. + // Wait a1 deleted. + GenericTestUtils.waitFor(() -> { + AbstractCSQueue a1Tmp = (AbstractCSQueue) cs.getQueue( + "root.a.a1-auto"); + return a1Tmp == null; + }, 100, 3000); + a1 = (AbstractCSQueue) cs.getQueue( + "root.a.a1-auto"); + Assert.assertNull("a1 is not deleted", a1); + } + + @Test + public void testCapacitySchedulerAutoQueueDeletionDisabled() + throws Exception { + startScheduler(); + // Test for disabled auto deletion + csConf.setAutoExpiredDeletionEnabled( + "root.a.a1-auto.a2-auto", false); + csConf.setBoolean( + YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); + csConf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, + AutoCreatedQueueDeletionPolicy.class.getCanonicalName()); + csConf.setAutoExpiredDeletionTime(1); + cs.reinitialize(csConf, mockRM.getRMContext()); + + Set policies = new HashSet<>(); + policies.add( + AutoCreatedQueueDeletionPolicy.class.getCanonicalName()); + + Assert.assertTrue( + "No AutoCreatedQueueDeletionPolicy " + + "is present in running monitors", + cs.getSchedulingMonitorManager(). + isSameConfiguredPolicies(policies)); + + ApplicationAttemptId a2App = submitApp(cs, USER0, + "a2-auto", "root.a.a1-auto"); + + // Wait a2 created successfully. + GenericTestUtils.waitFor(()-> cs.getQueue( + "root.a.a1-auto.a2-auto") != null, + 100, 2000); + + AbstractCSQueue a1 = (AbstractCSQueue) cs.getQueue( + "root.a.a1-auto"); + Assert.assertNotNull("a1 is not present", a1); + AbstractCSQueue a2 = (AbstractCSQueue) cs.getQueue( + "root.a.a1-auto.a2-auto"); + Assert.assertNotNull("a2 is not present", a2); + Assert.assertTrue("a2 is not a dynamic queue", + a2.isDynamicQueue()); + + // Make app finished. + AppAttemptRemovedSchedulerEvent event = + new AppAttemptRemovedSchedulerEvent(a2App, + RMAppAttemptState.FINISHED, false); + cs.handle(event); + AppRemovedSchedulerEvent rEvent = new AppRemovedSchedulerEvent( + a2App.getApplicationId(), RMAppState.FINISHED); + cs.handle(rEvent); + + // Now there are no apps in a2 queue. + Assert.assertEquals(0, a2.getNumApplications()); + + // Wait the time expired. + long l1 = a2.getLastSubmittedTimestamp(); + GenericTestUtils.waitFor(() -> { + long duration = (Time.monotonicNow() - l1)/1000; + return duration > csConf.getAutoExpiredDeletionTime(); + }, 100, 2000); + + // The auto deletion is no enabled for a2-auto + a1 = (AbstractCSQueue) cs.getQueue( + "root.a.a1-auto"); + Assert.assertNotNull("a1 is not present", a1); + a2 = (AbstractCSQueue) cs.getQueue( + "root.a.a1-auto.a2-auto"); + Assert.assertNotNull("a2 is not present", a2); + Assert.assertTrue("a2 is not a dynamic queue", + a2.isDynamicQueue()); + + // Enabled now + // The auto deletion will work. + csConf.setAutoExpiredDeletionEnabled( + "root.a.a1-auto.a2-auto", true); + cs.reinitialize(csConf, mockRM.getRMContext()); + + // Wait the a2 deleted. + GenericTestUtils.waitFor(() -> { + AbstractCSQueue a2Tmp = (AbstractCSQueue) cs.getQueue( + "root.a.a1-auto.a2-auto"); + return a2Tmp == null; + }, 100, 3000); + + a2 = (AbstractCSQueue) cs. + getQueue("root.a.a1-auto.a2-auto"); + Assert.assertNull("a2 is not deleted", a2); + // The parent will not be deleted with child queues + a1 = (AbstractCSQueue) cs.getQueue( + "root.a.a1-auto"); + Assert.assertNotNull("a1 is not present", a1); + + // Now the parent queue without child + // will be deleted for expired. + // Wait a1 deleted. + GenericTestUtils.waitFor(() -> { + AbstractCSQueue a1Tmp = (AbstractCSQueue) cs.getQueue( + "root.a.a1-auto"); + return a1Tmp == null; + }, 100, 3000); + a1 = (AbstractCSQueue) cs.getQueue( + "root.a.a1-auto"); + Assert.assertNull("a1 is not deleted", a1); + } + + @Test + public void testAutoCreateQueueAfterRemoval() throws Exception { + // queue's weights are 1 + // root + // - a (w=1) + // - b (w=1) + // - c-auto (w=1) + // - d-auto (w=1) + // - e-auto (w=1) + // - e1-auto (w=1) + startScheduler(); + + createBasicQueueStructureAndValidate(); + + // Under e, there's only one queue, so e1/e have same capacity + CSQueue e1 = cs.getQueue("root.e-auto.e1-auto"); + Assert.assertEquals(1 / 5f, e1.getAbsoluteCapacity(), 1e-6); + Assert.assertEquals(1f, e1.getQueueCapacities().getWeight(), 1e-6); + Assert.assertEquals(240 * GB, + e1.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize()); + + // Check after removal e1. + cs.removeQueue(e1); + CSQueue e = cs.getQueue("root.e-auto"); + Assert.assertEquals(1 / 5f, e.getAbsoluteCapacity(), 1e-6); + Assert.assertEquals(1f, e.getQueueCapacities().getWeight(), 1e-6); + Assert.assertEquals(240 * GB, + e.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize()); + + // Check after removal e. + cs.removeQueue(e); + CSQueue d = cs.getQueue("root.d-auto"); + Assert.assertEquals(1 / 4f, d.getAbsoluteCapacity(), 1e-6); + Assert.assertEquals(1f, d.getQueueCapacities().getWeight(), 1e-6); + Assert.assertEquals(300 * GB, + d.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize()); + + // Check after removal d. + cs.removeQueue(d); + CSQueue c = cs.getQueue("root.c-auto"); + Assert.assertEquals(1 / 3f, c.getAbsoluteCapacity(), 1e-6); + Assert.assertEquals(1f, c.getQueueCapacities().getWeight(), 1e-6); + Assert.assertEquals(400 * GB, + c.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize()); + + // Check after removal c. + cs.removeQueue(c); + CSQueue b = cs.getQueue("root.b"); + Assert.assertEquals(1 / 2f, b.getAbsoluteCapacity(), 1e-6); + Assert.assertEquals(1f, b.getQueueCapacities().getWeight(), 1e-6); + Assert.assertEquals(600 * GB, + b.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize()); + + // Check can't remove static queue b. + try { + cs.removeQueue(b); + Assert.fail("Can't remove static queue b!"); + } catch (Exception ex) { + Assert.assertTrue(ex + instanceof SchedulerDynamicEditException); + } + // Check a. + CSQueue a = cs.getQueue("root.a"); + Assert.assertEquals(1 / 2f, a.getAbsoluteCapacity(), 1e-6); + Assert.assertEquals(1f, a.getQueueCapacities().getWeight(), 1e-6); + Assert.assertEquals(600 * GB, + b.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize()); + } + + protected LeafQueue createQueue(String queuePath) throws YarnException { return autoQueueHandler.autoCreateQueue( CSQueueUtils.extractQueuePath(queuePath)); }