YARN-10532. Capacity Scheduler Auto Queue Creation: Allow auto delete queue when queue is not being used. Contributed by Qi Zhu.

This commit is contained in:
Peter Bacsko 2021-03-04 17:18:35 +01:00
parent a85aeee876
commit 6699198b54
11 changed files with 931 additions and 10 deletions

View File

@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -154,6 +155,10 @@ public abstract class AbstractCSQueue implements CSQueue {
// is it a dynamic queue?
private boolean dynamicQueue = false;
// The timestamp of the last submitted application to this queue.
// Only applies to dynamic queues.
private long lastSubmittedTimestamp;
public AbstractCSQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
this(cs, cs.getConfiguration(), queueName, parent, old);
@ -1642,4 +1647,46 @@ public abstract class AbstractCSQueue implements CSQueue {
return "capacity=" + queueCapacities.getCapacity();
}
}
public boolean isEligibleForAutoDeletion() {
return false;
}
public boolean isInactiveDynamicQueue() {
long idleDurationSeconds =
(Time.monotonicNow() - getLastSubmittedTimestamp())/1000;
return isDynamicQueue() && isEligibleForAutoDeletion() &&
(idleDurationSeconds > this.csContext.getConfiguration().
getAutoExpiredDeletionTime());
}
public void updateLastSubmittedTimeStamp() {
writeLock.lock();
try {
this.lastSubmittedTimestamp = Time.monotonicNow();
} finally {
writeLock.unlock();
}
}
public long getLastSubmittedTimestamp() {
readLock.lock();
try {
return lastSubmittedTimestamp;
} finally {
readLock.unlock();
}
}
@VisibleForTesting
public void setLastSubmittedTimestamp(long lastSubmittedTimestamp) {
writeLock.lock();
try {
this.lastSubmittedTimestamp = lastSubmittedTimestamp;
} finally {
writeLock.unlock();
}
}
}

View File

@ -0,0 +1,163 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AutoCreatedQueueDeletionEvent;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* Auto deletion policy for auto created queue V2.
* Just for weight based auto created queues.
*/
public class AutoCreatedQueueDeletionPolicy implements SchedulingEditPolicy {
private static final Logger LOG =
LoggerFactory.getLogger(AutoCreatedQueueDeletionPolicy.class);
private Clock clock;
// Pointer to other RM components
private RMContext rmContext;
private ResourceCalculator rc;
private CapacityScheduler scheduler;
private long monitoringInterval;
// markedForDeletion: in each interval,
// this set is extended by queues that are eligible for auto deletion.
private Set<String> markedForDeletion = new HashSet<>();
// sentForDeletion: if in the next interval,
// there is queue, that is eligible for auto deletion,
// and is already marked for deletion, move it to this queue.
private Set<String> sentForDeletion = new HashSet<>();
@Override
public void init(final Configuration config, final RMContext context,
final ResourceScheduler sched) {
LOG.info("Auto Deletion Policy monitor: {}" + this.
getClass().getCanonicalName());
if (!(sched instanceof CapacityScheduler)) {
throw new YarnRuntimeException("Class " +
sched.getClass().getCanonicalName() + " not instance of " +
CapacityScheduler.class.getCanonicalName());
}
rmContext = context;
scheduler = (CapacityScheduler) sched;
clock = scheduler.getClock();
rc = scheduler.getResourceCalculator();
CapacitySchedulerConfiguration csConfig = scheduler.getConfiguration();
// The monitor time will equal the
// auto deletion expired time default.
monitoringInterval =
csConfig.getLong(CapacitySchedulerConfiguration.
AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME,
CapacitySchedulerConfiguration.
DEFAULT_AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME) * 1000;
prepareForAutoDeletion();
}
public void prepareForAutoDeletion() {
Set<String> newMarks = new HashSet<>();
for (Map.Entry<String, CSQueue> queueEntry :
scheduler.getCapacitySchedulerQueueManager().getQueues().entrySet()) {
String queuePath = queueEntry.getKey();
CSQueue queue = queueEntry.getValue();
if (queue instanceof AbstractCSQueue &&
((AbstractCSQueue) queue).isEligibleForAutoDeletion()) {
if (markedForDeletion.contains(queuePath)) {
sentForDeletion.add(queuePath);
markedForDeletion.remove(queuePath);
} else {
newMarks.add(queuePath);
}
}
}
markedForDeletion.clear();
markedForDeletion.addAll(newMarks);
}
@Override
public void editSchedule() {
long startTs = clock.getTime();
prepareForAutoDeletion();
triggerAutoDeletionForExpiredQueues();
if (LOG.isDebugEnabled()) {
LOG.debug("Total time used=" + (clock.getTime() - startTs) + " ms.");
}
}
public void triggerAutoDeletionForExpiredQueues() {
// Proceed new auto created queues
for (String queueName : sentForDeletion) {
CSQueue checkQueue =
scheduler.getCapacitySchedulerQueueManager().
getQueue(queueName);
deleteAutoCreatedQueue(checkQueue);
}
sentForDeletion.clear();
}
private void deleteAutoCreatedQueue(CSQueue queue) {
if (queue != null) {
AutoCreatedQueueDeletionEvent autoCreatedQueueDeletionEvent =
new AutoCreatedQueueDeletionEvent(queue);
LOG.info("Queue:" + queue.getQueuePath() +
" will trigger deletion event to CS.");
scheduler.getRMContext().getDispatcher().getEventHandler().handle(
autoCreatedQueueDeletionEvent);
}
}
@Override
public long getMonitoringInterval() {
return monitoringInterval;
}
@Override
public String getPolicyName() {
return AutoCreatedQueueDeletionPolicy.class.getCanonicalName();
}
@VisibleForTesting
public Set<String> getMarkedForDeletion() {
return markedForDeletion;
}
@VisibleForTesting
public Set<String> getSentForDeletion() {
return sentForDeletion;
}
}

View File

@ -143,9 +143,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsU
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
.QueueManagementChangeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AutoCreatedQueueDeletionEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
@ -2106,11 +2106,35 @@ public class CapacityScheduler extends
}
}
break;
case AUTO_QUEUE_DELETION:
try {
AutoCreatedQueueDeletionEvent autoCreatedQueueDeletionEvent =
(AutoCreatedQueueDeletionEvent) event;
removeAutoCreatedQueue(autoCreatedQueueDeletionEvent.
getCheckQueue());
} catch (SchedulerDynamicEditException sde) {
LOG.error("Dynamic queue deletion cannot be applied for "
+ "queue : ", sde);
}
break;
default:
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
}
}
private void removeAutoCreatedQueue(CSQueue checkQueue)
throws SchedulerDynamicEditException{
writeLock.lock();
try {
if (checkQueue instanceof AbstractCSQueue
&& ((AbstractCSQueue) checkQueue).isInactiveDynamicQueue()) {
removeQueue(checkQueue);
}
} finally {
writeLock.unlock();
}
}
private void updateNodeAttributes(
NodeAttributesUpdateSchedulerEvent attributeUpdateEvent) {
writeLock.lock();
@ -2564,6 +2588,44 @@ public class CapacityScheduler extends
}
}
public void removeQueue(CSQueue queue)
throws SchedulerDynamicEditException {
writeLock.lock();
try {
LOG.info("Removing queue: " + queue.getQueuePath());
if (!((AbstractCSQueue)queue).isDynamicQueue()) {
throw new SchedulerDynamicEditException(
"The queue that we are asked "
+ "to remove (" + queue.getQueuePath()
+ ") is not a DynamicQueue");
}
if (!((AbstractCSQueue) queue).isEligibleForAutoDeletion()) {
LOG.warn("Queue " + queue.getQueuePath() +
" is marked for deletion, but not eligible for deletion");
return;
}
ParentQueue parentQueue = (ParentQueue)queue.getParent();
if (parentQueue != null) {
((ParentQueue) queue.getParent()).removeChildQueue(queue);
} else {
throw new SchedulerDynamicEditException(
"The queue " + queue.getQueuePath()
+ " can't be removed because it's parent is null");
}
if (parentQueue.childQueues.contains(queue) ||
queueManager.getQueue(queue.getQueuePath()) != null) {
throw new SchedulerDynamicEditException(
"The queue " + queue.getQueuePath()
+ " has not been removed normally.");
}
} finally {
writeLock.unlock();
}
}
@Override
public void addQueue(Queue queue)
throws SchedulerDynamicEditException, IOException {

View File

@ -2200,6 +2200,62 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
public static final long DEFAULT_QUEUE_MANAGEMENT_MONITORING_INTERVAL =
1500L;
@Private
public static final boolean
DEFAULT_AUTO_CREATE_CHILD_QUEUE_AUTO_REMOVAL_ENABLE = true;
@Private
public static final String AUTO_CREATE_CHILD_QUEUE_AUTO_REMOVAL_ENABLE =
AUTO_QUEUE_CREATION_V2_PREFIX + "queue-auto-removal.enable";
// 300s for expired default
@Private
public static final long
DEFAULT_AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME = 300;
@Private
public static final String AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME =
PREFIX + AUTO_QUEUE_CREATION_V2_PREFIX + "queue-expiration-time";
/**
* If true, auto created queue with weight mode
* will be deleted when queue is expired.
* @param queuePath the queue's path for auto deletion check
* @return true if auto created queue's deletion when expired is enabled
* else false. Default
* is true.
*/
@Private
public boolean isAutoExpiredDeletionEnabled(String queuePath) {
boolean isAutoExpiredDeletionEnabled = getBoolean(
getQueuePrefix(queuePath) +
AUTO_CREATE_CHILD_QUEUE_AUTO_REMOVAL_ENABLE,
DEFAULT_AUTO_CREATE_CHILD_QUEUE_AUTO_REMOVAL_ENABLE);
return isAutoExpiredDeletionEnabled;
}
@Private
@VisibleForTesting
public void setAutoExpiredDeletionEnabled(String queuePath,
boolean autoRemovalEnable) {
setBoolean(getQueuePrefix(queuePath) +
AUTO_CREATE_CHILD_QUEUE_AUTO_REMOVAL_ENABLE,
autoRemovalEnable);
}
@Private
@VisibleForTesting
public void setAutoExpiredDeletionTime(long time) {
setLong(AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME, time);
}
@Private
@VisibleForTesting
public long getAutoExpiredDeletionTime() {
return getLong(AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME,
DEFAULT_AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME);
}
/**
* Time in milliseconds between invocations
* of QueueConfigurationAutoRefreshPolicy.

View File

@ -614,6 +614,9 @@ public class LeafQueue extends AbstractCSQueue {
// Careful! Locking order is important!
validateSubmitApplication(applicationId, userName, queue);
// Signal for expired auto deletion.
updateLastSubmittedTimeStamp();
// Inform the parent queue
try {
getParent().submitApplication(applicationId, userName, queue);
@ -2402,4 +2405,11 @@ public class LeafQueue extends AbstractCSQueue {
}
return appsToReturn;
}
@Override
public boolean isEligibleForAutoDeletion() {
return isDynamicQueue() && getNumApplications() == 0
&& csContext.getConfiguration().
isAutoExpiredDeletionEnabled(this.getQueuePath());
}
}

View File

@ -570,9 +570,10 @@ public class ParentQueue extends AbstractCSQueue {
CSQueue newQueue = createNewQueue(childQueuePath, isLeaf);
this.childQueues.add(newQueue);
updateLastSubmittedTimeStamp();
// Call updateClusterResource
// , which will deal with all effectiveMin/MaxResource
// Call updateClusterResource.
// Which will deal with all effectiveMin/MaxResource
// Calculation
this.updateClusterResource(csContext.getClusterResource(),
new ResourceLimits(this.csContext.getClusterResource()));
@ -583,6 +584,28 @@ public class ParentQueue extends AbstractCSQueue {
}
}
// New method to remove child queue
public void removeChildQueue(CSQueue queue)
throws SchedulerDynamicEditException {
writeLock.lock();
try {
// Now we can do remove and update
this.childQueues.remove(queue);
this.scheduler.getCapacitySchedulerQueueManager()
.removeQueue(queue.getQueuePath());
// Call updateClusterResource,
// which will deal with all effectiveMin/MaxResource
// Calculation
this.updateClusterResource(csContext.getClusterResource(),
new ResourceLimits(this.csContext.getClusterResource()));
} finally {
writeLock.unlock();
}
}
/**
* Check whether this queue supports adding additional child queues
* dynamically.
@ -1607,4 +1630,11 @@ public class ParentQueue extends AbstractCSQueue {
Map<String, Float> getEffectiveMinRatioPerResource() {
return effectiveMinRatioPerResource;
}
@Override
public boolean isEligibleForAutoDeletion() {
return isDynamicQueue() && getChildQueues().size() == 0 &&
csContext.getConfiguration().
isAutoExpiredDeletionEnabled(this.getQueuePath());
}
}

View File

@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
public class AutoCreatedQueueDeletionEvent extends SchedulerEvent{
private CSQueue checkQueue;
public AutoCreatedQueueDeletionEvent(CSQueue checkQueue) {
super(SchedulerEventType.AUTO_QUEUE_DELETION);
this.checkQueue = checkQueue;
}
public CSQueue getCheckQueue() {
return checkQueue;
}
}

View File

@ -55,5 +55,8 @@ public enum SchedulerEventType {
MARK_CONTAINER_FOR_NONKILLABLE,
//Queue Management Change
MANAGE_QUEUE
MANAGE_QUEUE,
// Auto created queue, auto deletion check
AUTO_QUEUE_DELETION
}

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.Proportion
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedQueueDeletionPolicy;
import org.junit.Test;
import java.util.HashSet;
@ -91,5 +92,47 @@ public class TestSchedulingMonitor {
YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS);
cs.reinitialize(conf, rm.getRMContext());
assertTrue(smm.isRSMEmpty());
rm.close();
}
@Test(timeout = 10000)
public void testRMUpdateAutoCreatedQueueDeletionPolicy() throws Exception {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
AutoCreatedQueueDeletionPolicy.class.getCanonicalName());
MockRM rm = new MockRM(conf);
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
SchedulingMonitorManager smm = cs.getSchedulingMonitorManager();
// runningSchedulingMonitors should not be empty when initialize RM
// scheduler monitor
cs.reinitialize(conf, rm.getRMContext());
assertFalse(smm.isRSMEmpty());
// make sure runningSchedulingPolicies contains all the configured policy
// in YARNConfiguration
String[] configuredPolicies = conf.getStrings(
YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES);
Set<String> configurePoliciesSet = new HashSet<>();
for (String s : configuredPolicies) {
configurePoliciesSet.add(s);
}
assertTrue(smm.isSameConfiguredPolicies(configurePoliciesSet));
// make sure the running monitor contains AutoCreatedQueueDeletionPolicy
assertTrue(configurePoliciesSet.
contains(AutoCreatedQueueDeletionPolicy.class.getCanonicalName()));
// disable RM scheduler monitor
conf.setBoolean(
YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS);
cs.reinitialize(conf, rm.getRMContext());
assertTrue(smm.isRSMEmpty());
rm.close();
}
}

View File

@ -0,0 +1,184 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.junit.Assert;
import org.junit.Test;
public class TestAutoCreatedQueueDeletionPolicy
extends TestCapacitySchedulerNewQueueAutoCreation {
private CapacityScheduler cs;
private AutoCreatedQueueDeletionPolicy policy;
public void prepareForSchedule() throws Exception{
super.startScheduler();
policy = getPolicy();
cs = getCs();
policy.editSchedule();
// There are no queues should be scheduled
Assert.assertEquals(policy.getMarkedForDeletion().size(), 0);
Assert.assertEquals(policy.getSentForDeletion().size(), 0);
createQueue("root.e.e1");
}
@Test
public void testEditSchedule() throws Exception {
prepareForSchedule();
// Make sure e not null
AbstractCSQueue e = (AbstractCSQueue) cs.
getQueue("root.e");
Assert.assertNotNull(e);
Assert.assertTrue(e.isDynamicQueue());
// Make sure e1 not null
AbstractCSQueue e1 = (AbstractCSQueue)cs.
getQueue("root.e.e1");
Assert.assertNotNull(e1);
Assert.assertTrue(e1.isDynamicQueue());
// signal it because of without submit created
e1.setLastSubmittedTimestamp(Time.monotonicNow());
ApplicationAttemptId user0AppAttemptId =
submitApp(cs, USER0, USER0, "root.e");
// Wait user0 created successfully.
GenericTestUtils.waitFor(()-> cs.getQueue(
"root.e.user_0") != null, 100,
2000);
// Make sure user0 not null
AbstractCSQueue user0 = (AbstractCSQueue) cs
.getQueue("root.e.user_0");
Assert.assertNotNull(user0);
Assert.assertTrue(user0.isDynamicQueue());
// Make app finished
AppAttemptRemovedSchedulerEvent event =
new AppAttemptRemovedSchedulerEvent(user0AppAttemptId,
RMAppAttemptState.FINISHED, false);
cs.handle(event);
AppRemovedSchedulerEvent rEvent = new AppRemovedSchedulerEvent(
user0AppAttemptId.getApplicationId(), RMAppState.FINISHED);
cs.handle(rEvent);
// There are no apps in user0
Assert.assertEquals(user0.getNumApplications(), 0);
// Wait the time expired.
long l1 = user0.getLastSubmittedTimestamp();
GenericTestUtils.waitFor(() -> {
long duration = (Time.monotonicNow() - l1)/1000;
return duration > getCs().
getConfiguration().getAutoExpiredDeletionTime();
}, 100, 2000);
long l2 = e1.getLastSubmittedTimestamp();
GenericTestUtils.waitFor(() -> {
long duration = (Time.monotonicNow() - l2)/1000;
return duration > getCs().
getConfiguration().getAutoExpiredDeletionTime();
}, 100, 2000);
policy.editSchedule();
// Make sure user_0 , e1 queue
// will be scheduled to mark for deletion
// because it is expired for deletion.
Assert.assertEquals(policy.getMarkedForDeletion().size(), 2);
Assert.assertTrue(policy.
getMarkedForDeletion().contains("root.e.user_0"));
Assert.assertTrue(policy.
getMarkedForDeletion().contains("root.e.e1"));
// Make sure the send for deletion is empty for first mark.
Assert.assertEquals(policy.getSentForDeletion().size(), 0);
// Make sure user_0 , e1 queue will be scheduled to send for deletion
policy.prepareForAutoDeletion();
Assert.assertEquals(policy.getMarkedForDeletion().size(), 0);
Assert.assertEquals(policy.getSentForDeletion().size(), 2);
// Make sure e1, user0 not null before trigger remove.
e1 = (AbstractCSQueue) cs.getQueue("root.e.e1");
Assert.assertNotNull(e1);
user0 = (AbstractCSQueue)cs.getQueue("root.e.user_0");
Assert.assertNotNull(user0);
// Make sure e1, user0 will be null after trigger remove.
policy.triggerAutoDeletionForExpiredQueues();
Assert.assertEquals(policy.getMarkedForDeletion().size(), 0);
Assert.assertEquals(policy.getSentForDeletion().size(), 0);
// Wait e1, user0 auto deleted.
GenericTestUtils.waitFor(()-> cs.getQueue(
"root.e.e1") == null,
100, 2000);
GenericTestUtils.waitFor(()-> cs.getQueue(
"root.e.user_0") == null,
100, 2000);
e1 = (AbstractCSQueue) cs.getQueue("root.e.e1");
Assert.assertNull(e1);
user0 = (AbstractCSQueue)cs.getQueue("root.e.user_0");
Assert.assertNull(user0);
// Make sure e is not null, before schedule.
e = (AbstractCSQueue) cs.getQueue("root.e");
Assert.assertNotNull(e);
// Expired for e
// Wait e marked for deletion.
long l3 = e.getLastSubmittedTimestamp();
GenericTestUtils.waitFor(() -> {
long duration = (Time.monotonicNow() - l3)/1000;
return duration > getCs().
getConfiguration().getAutoExpiredDeletionTime();
}, 100, 2000);
policy.editSchedule();
e = (AbstractCSQueue) cs.getQueue("root.e");
Assert.assertNotNull(e);
Assert.assertEquals(policy.getMarkedForDeletion().size(), 1);
Assert.assertEquals(policy.getSentForDeletion().size(), 0);
Assert.assertTrue(policy.getMarkedForDeletion().contains("root.e"));
// Make sure e queue will be scheduled to send for deletion
policy.prepareForAutoDeletion();
Assert.assertEquals(policy.getMarkedForDeletion().size(), 0);
Assert.assertEquals(policy.getSentForDeletion().size(), 1);
// Make sure e not null before trigger remove.
e = (AbstractCSQueue) cs.getQueue("root.e");
Assert.assertNotNull(e);
// Make sure e will be null after trigger remove.
policy.triggerAutoDeletionForExpiredQueues();
// Wait e1 auto deleted.
GenericTestUtils.waitFor(()-> cs.getQueue(
"root.e") == null, 100, 2000);
Assert.assertEquals(policy.getMarkedForDeletion().size(), 0);
Assert.assertEquals(policy.getSentForDeletion().size(), 0);
e = (AbstractCSQueue) cs.getQueue("root.e");
Assert.assertNull(e);
}
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -26,12 +28,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
import org.junit.Before;
@ -39,6 +44,9 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Set;
import java.util.HashSet;
public class TestCapacitySchedulerNewQueueAutoCreation
extends TestCapacitySchedulerAutoCreatedQueueBase {
private static final Logger LOG = LoggerFactory.getLogger(
@ -50,6 +58,16 @@ public class TestCapacitySchedulerNewQueueAutoCreation
private CapacityScheduler cs;
private CapacitySchedulerConfiguration csConf;
private CapacitySchedulerAutoQueueHandler autoQueueHandler;
private AutoCreatedQueueDeletionPolicy policy = new
AutoCreatedQueueDeletionPolicy();
public CapacityScheduler getCs() {
return cs;
}
public AutoCreatedQueueDeletionPolicy getPolicy() {
return policy;
}
/*
Create the following structure:
@ -75,9 +93,12 @@ public class TestCapacitySchedulerNewQueueAutoCreation
csConf.setAutoQueueCreationV2Enabled("root", true);
csConf.setAutoQueueCreationV2Enabled("root.a", true);
csConf.setAutoQueueCreationV2Enabled("root.e", true);
csConf.setAutoQueueCreationV2Enabled(PARENT_QUEUE, true);
// Test for auto deletion when expired
csConf.setAutoExpiredDeletionTime(1);
}
private void startScheduler() throws Exception {
protected void startScheduler() throws Exception {
RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
mgr.init(csConf);
mockRM = new MockRM(csConf) {
@ -87,6 +108,8 @@ public class TestCapacitySchedulerNewQueueAutoCreation
};
cs = (CapacityScheduler) mockRM.getResourceScheduler();
cs.updatePlacementRules();
// Policy for new auto created queue's auto deletion when expired
policy.init(cs.getConfiguration(), cs.getRMContext(), cs);
mockRM.start();
cs.start();
autoQueueHandler = new CapacitySchedulerAutoQueueHandler(
@ -506,7 +529,7 @@ public class TestCapacitySchedulerNewQueueAutoCreation
Assert.assertTrue(user0.isDynamicQueue());
Assert.assertTrue(user0 instanceof LeafQueue);
LeafQueue user0LeafQueue = (LeafQueue)user0;
LeafQueue user0LeafQueue = (LeafQueue) user0;
// Assert user limit factor is -1
Assert.assertTrue(user0LeafQueue.getUserLimitFactor() == -1);
@ -517,10 +540,11 @@ public class TestCapacitySchedulerNewQueueAutoCreation
// Assert AM Resource
Assert.assertEquals(user0LeafQueue.getAMResourceLimit().getMemorySize(),
user0LeafQueue.getMaxAMResourcePerQueuePercent()*MAX_MEMORY*GB, 1e-6);
user0LeafQueue.
getMaxAMResourcePerQueuePercent() * MAX_MEMORY * GB, 1e-6);
// Assert user limit (no limit) when limit factor is -1
Assert.assertEquals(MAX_MEMORY*GB,
Assert.assertEquals(MAX_MEMORY * GB,
user0LeafQueue.getEffectiveMaxCapacityDown("",
user0LeafQueue.getMinimumAllocation()).getMemorySize(), 1e-6);
}
@ -585,7 +609,274 @@ public class TestCapacitySchedulerNewQueueAutoCreation
}
private LeafQueue createQueue(String queuePath) throws YarnException {
@Test
public void testCapacitySchedulerAutoQueueDeletion() throws Exception {
startScheduler();
csConf.setBoolean(
YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
csConf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
AutoCreatedQueueDeletionPolicy.class.getCanonicalName());
csConf.setAutoExpiredDeletionTime(1);
cs.reinitialize(csConf, mockRM.getRMContext());
Set<String> policies = new HashSet<>();
policies.add(
AutoCreatedQueueDeletionPolicy.class.getCanonicalName());
Assert.assertTrue(
"No AutoCreatedQueueDeletionPolicy " +
"is present in running monitors",
cs.getSchedulingMonitorManager().
isSameConfiguredPolicies(policies));
ApplicationAttemptId a2App = submitApp(cs, USER0,
"a2-auto", "root.a.a1-auto");
// Wait a2 created successfully.
GenericTestUtils.waitFor(()-> cs.getQueue(
"root.a.a1-auto.a2-auto") != null,
100, 2000);
AbstractCSQueue a1 = (AbstractCSQueue) cs.getQueue(
"root.a.a1-auto");
Assert.assertNotNull("a1 is not present", a1);
AbstractCSQueue a2 = (AbstractCSQueue) cs.getQueue(
"root.a.a1-auto.a2-auto");
Assert.assertNotNull("a2 is not present", a2);
Assert.assertTrue("a2 is not a dynamic queue",
a2.isDynamicQueue());
// Now there are still 1 app in a2 queue.
Assert.assertEquals(1, a2.getNumApplications());
// Wait the time expired.
long l1 = a2.getLastSubmittedTimestamp();
GenericTestUtils.waitFor(() -> {
long duration = (Time.monotonicNow() - l1)/1000;
return duration > csConf.getAutoExpiredDeletionTime();
}, 100, 2000);
// Make sure the queue will not be deleted
// when expired with remaining apps.
a2 = (AbstractCSQueue) cs.getQueue(
"root.a.a1-auto.a2-auto");
Assert.assertNotNull("a2 is not present", a2);
// Make app finished.
AppAttemptRemovedSchedulerEvent event =
new AppAttemptRemovedSchedulerEvent(a2App,
RMAppAttemptState.FINISHED, false);
cs.handle(event);
AppRemovedSchedulerEvent rEvent = new AppRemovedSchedulerEvent(
a2App.getApplicationId(), RMAppState.FINISHED);
cs.handle(rEvent);
// Now there are no apps in a2 queue.
Assert.assertEquals(0, a2.getNumApplications());
// Wait the a2 deleted.
GenericTestUtils.waitFor(() -> {
AbstractCSQueue a2Tmp = (AbstractCSQueue) cs.getQueue(
"root.a.a1-auto.a2-auto");
return a2Tmp == null;
}, 100, 3000);
a2 = (AbstractCSQueue) cs.getQueue(
"root.a.a1-auto.a2-auto");
Assert.assertNull("a2 is not deleted", a2);
// The parent will not be deleted with child queues
a1 = (AbstractCSQueue) cs.getQueue(
"root.a.a1-auto");
Assert.assertNotNull("a1 is not present", a1);
// Now the parent queue without child
// will be deleted for expired.
// Wait a1 deleted.
GenericTestUtils.waitFor(() -> {
AbstractCSQueue a1Tmp = (AbstractCSQueue) cs.getQueue(
"root.a.a1-auto");
return a1Tmp == null;
}, 100, 3000);
a1 = (AbstractCSQueue) cs.getQueue(
"root.a.a1-auto");
Assert.assertNull("a1 is not deleted", a1);
}
@Test
public void testCapacitySchedulerAutoQueueDeletionDisabled()
throws Exception {
startScheduler();
// Test for disabled auto deletion
csConf.setAutoExpiredDeletionEnabled(
"root.a.a1-auto.a2-auto", false);
csConf.setBoolean(
YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
csConf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
AutoCreatedQueueDeletionPolicy.class.getCanonicalName());
csConf.setAutoExpiredDeletionTime(1);
cs.reinitialize(csConf, mockRM.getRMContext());
Set<String> policies = new HashSet<>();
policies.add(
AutoCreatedQueueDeletionPolicy.class.getCanonicalName());
Assert.assertTrue(
"No AutoCreatedQueueDeletionPolicy " +
"is present in running monitors",
cs.getSchedulingMonitorManager().
isSameConfiguredPolicies(policies));
ApplicationAttemptId a2App = submitApp(cs, USER0,
"a2-auto", "root.a.a1-auto");
// Wait a2 created successfully.
GenericTestUtils.waitFor(()-> cs.getQueue(
"root.a.a1-auto.a2-auto") != null,
100, 2000);
AbstractCSQueue a1 = (AbstractCSQueue) cs.getQueue(
"root.a.a1-auto");
Assert.assertNotNull("a1 is not present", a1);
AbstractCSQueue a2 = (AbstractCSQueue) cs.getQueue(
"root.a.a1-auto.a2-auto");
Assert.assertNotNull("a2 is not present", a2);
Assert.assertTrue("a2 is not a dynamic queue",
a2.isDynamicQueue());
// Make app finished.
AppAttemptRemovedSchedulerEvent event =
new AppAttemptRemovedSchedulerEvent(a2App,
RMAppAttemptState.FINISHED, false);
cs.handle(event);
AppRemovedSchedulerEvent rEvent = new AppRemovedSchedulerEvent(
a2App.getApplicationId(), RMAppState.FINISHED);
cs.handle(rEvent);
// Now there are no apps in a2 queue.
Assert.assertEquals(0, a2.getNumApplications());
// Wait the time expired.
long l1 = a2.getLastSubmittedTimestamp();
GenericTestUtils.waitFor(() -> {
long duration = (Time.monotonicNow() - l1)/1000;
return duration > csConf.getAutoExpiredDeletionTime();
}, 100, 2000);
// The auto deletion is no enabled for a2-auto
a1 = (AbstractCSQueue) cs.getQueue(
"root.a.a1-auto");
Assert.assertNotNull("a1 is not present", a1);
a2 = (AbstractCSQueue) cs.getQueue(
"root.a.a1-auto.a2-auto");
Assert.assertNotNull("a2 is not present", a2);
Assert.assertTrue("a2 is not a dynamic queue",
a2.isDynamicQueue());
// Enabled now
// The auto deletion will work.
csConf.setAutoExpiredDeletionEnabled(
"root.a.a1-auto.a2-auto", true);
cs.reinitialize(csConf, mockRM.getRMContext());
// Wait the a2 deleted.
GenericTestUtils.waitFor(() -> {
AbstractCSQueue a2Tmp = (AbstractCSQueue) cs.getQueue(
"root.a.a1-auto.a2-auto");
return a2Tmp == null;
}, 100, 3000);
a2 = (AbstractCSQueue) cs.
getQueue("root.a.a1-auto.a2-auto");
Assert.assertNull("a2 is not deleted", a2);
// The parent will not be deleted with child queues
a1 = (AbstractCSQueue) cs.getQueue(
"root.a.a1-auto");
Assert.assertNotNull("a1 is not present", a1);
// Now the parent queue without child
// will be deleted for expired.
// Wait a1 deleted.
GenericTestUtils.waitFor(() -> {
AbstractCSQueue a1Tmp = (AbstractCSQueue) cs.getQueue(
"root.a.a1-auto");
return a1Tmp == null;
}, 100, 3000);
a1 = (AbstractCSQueue) cs.getQueue(
"root.a.a1-auto");
Assert.assertNull("a1 is not deleted", a1);
}
@Test
public void testAutoCreateQueueAfterRemoval() throws Exception {
// queue's weights are 1
// root
// - a (w=1)
// - b (w=1)
// - c-auto (w=1)
// - d-auto (w=1)
// - e-auto (w=1)
// - e1-auto (w=1)
startScheduler();
createBasicQueueStructureAndValidate();
// Under e, there's only one queue, so e1/e have same capacity
CSQueue e1 = cs.getQueue("root.e-auto.e1-auto");
Assert.assertEquals(1 / 5f, e1.getAbsoluteCapacity(), 1e-6);
Assert.assertEquals(1f, e1.getQueueCapacities().getWeight(), 1e-6);
Assert.assertEquals(240 * GB,
e1.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
// Check after removal e1.
cs.removeQueue(e1);
CSQueue e = cs.getQueue("root.e-auto");
Assert.assertEquals(1 / 5f, e.getAbsoluteCapacity(), 1e-6);
Assert.assertEquals(1f, e.getQueueCapacities().getWeight(), 1e-6);
Assert.assertEquals(240 * GB,
e.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
// Check after removal e.
cs.removeQueue(e);
CSQueue d = cs.getQueue("root.d-auto");
Assert.assertEquals(1 / 4f, d.getAbsoluteCapacity(), 1e-6);
Assert.assertEquals(1f, d.getQueueCapacities().getWeight(), 1e-6);
Assert.assertEquals(300 * GB,
d.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
// Check after removal d.
cs.removeQueue(d);
CSQueue c = cs.getQueue("root.c-auto");
Assert.assertEquals(1 / 3f, c.getAbsoluteCapacity(), 1e-6);
Assert.assertEquals(1f, c.getQueueCapacities().getWeight(), 1e-6);
Assert.assertEquals(400 * GB,
c.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
// Check after removal c.
cs.removeQueue(c);
CSQueue b = cs.getQueue("root.b");
Assert.assertEquals(1 / 2f, b.getAbsoluteCapacity(), 1e-6);
Assert.assertEquals(1f, b.getQueueCapacities().getWeight(), 1e-6);
Assert.assertEquals(600 * GB,
b.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
// Check can't remove static queue b.
try {
cs.removeQueue(b);
Assert.fail("Can't remove static queue b!");
} catch (Exception ex) {
Assert.assertTrue(ex
instanceof SchedulerDynamicEditException);
}
// Check a.
CSQueue a = cs.getQueue("root.a");
Assert.assertEquals(1 / 2f, a.getAbsoluteCapacity(), 1e-6);
Assert.assertEquals(1f, a.getQueueCapacities().getWeight(), 1e-6);
Assert.assertEquals(600 * GB,
b.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
}
protected LeafQueue createQueue(String queuePath) throws YarnException {
return autoQueueHandler.autoCreateQueue(
CSQueueUtils.extractQueuePath(queuePath));
}