YARN-6741. Deleting all children of a Parent Queue on refresh throws exception. Contributed by Naganarasimha G R.
This commit is contained in:
parent
a28a3dc8f2
commit
67f9968407
|
@ -326,6 +326,10 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
|
|||
+ "it is not yet in stopped state. Current State : "
|
||||
+ oldQueue.getState());
|
||||
}
|
||||
} else if (oldQueue instanceof ParentQueue
|
||||
&& newQueue instanceof LeafQueue) {
|
||||
LOG.info("Converting the parent queue: " + oldQueue.getQueuePath()
|
||||
+ " to leaf queue.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,14 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -34,7 +42,6 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
|
|||
import org.apache.hadoop.yarn.api.records.QueueState;
|
||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.security.AccessType;
|
||||
|
@ -45,7 +52,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
|
||||
|
@ -62,14 +68,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.Placeme
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@Private
|
||||
@Evolving
|
||||
public class ParentQueue extends AbstractCSQueue {
|
||||
|
@ -309,18 +307,21 @@ public class ParentQueue extends AbstractCSQueue {
|
|||
|
||||
// Check if the child-queue already exists
|
||||
if (childQueue != null) {
|
||||
// Check if the child-queue has been converted into parent queue.
|
||||
// The CS has already checked to ensure that this child-queue is in
|
||||
// STOPPED state.
|
||||
if (childQueue instanceof LeafQueue
|
||||
&& newChildQueue instanceof ParentQueue) {
|
||||
// We would convert this LeafQueue to ParentQueue, consider this
|
||||
// as the combination of DELETE then ADD.
|
||||
// Check if the child-queue has been converted into parent queue or
|
||||
// parent Queue has been converted to child queue. The CS has already
|
||||
// checked to ensure that this child-queue is in STOPPED state if
|
||||
// Child queue has been converted to ParentQueue.
|
||||
if ((childQueue instanceof LeafQueue
|
||||
&& newChildQueue instanceof ParentQueue)
|
||||
|| (childQueue instanceof ParentQueue
|
||||
&& newChildQueue instanceof LeafQueue)) {
|
||||
// We would convert this LeafQueue to ParentQueue, or vice versa.
|
||||
// consider this as the combination of DELETE then ADD.
|
||||
newChildQueue.setParent(this);
|
||||
currentChildQueues.put(newChildQueueName, newChildQueue);
|
||||
// inform CapacitySchedulerQueueManager
|
||||
CapacitySchedulerQueueManager queueManager = this.csContext
|
||||
.getCapacitySchedulerQueueManager();
|
||||
CapacitySchedulerQueueManager queueManager =
|
||||
this.csContext.getCapacitySchedulerQueueManager();
|
||||
queueManager.addQueue(newChildQueueName, newChildQueue);
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -163,6 +163,7 @@ import java.util.concurrent.CyclicBarrier;
|
|||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
@ -663,6 +664,36 @@ public class TestCapacityScheduler {
|
|||
return conf;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param conf, to be modified
|
||||
* @return, CS configuration which has deleted all childred of queue(b)
|
||||
* root
|
||||
* / \
|
||||
* a b
|
||||
* / \
|
||||
* a1 a2
|
||||
*/
|
||||
private CapacitySchedulerConfiguration setupQueueConfWithOutChildrenOfB(
|
||||
CapacitySchedulerConfiguration conf) {
|
||||
|
||||
// Define top-level queues
|
||||
conf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
||||
new String[] {"a","b"});
|
||||
|
||||
conf.setCapacity(A, A_CAPACITY);
|
||||
conf.setCapacity(B, B_CAPACITY);
|
||||
|
||||
// Define 2nd-level queues
|
||||
conf.setQueues(A, new String[] {"a1","a2"});
|
||||
conf.setCapacity(A1, A1_CAPACITY);
|
||||
conf.setUserLimitFactor(A1, 100.0f);
|
||||
conf.setCapacity(A2, A2_CAPACITY);
|
||||
conf.setUserLimitFactor(A2, 100.0f);
|
||||
|
||||
LOG.info("Setup top-level queues a and b (without children)");
|
||||
return conf;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param conf, to be modified
|
||||
* @return, CS configuration which has deleted a queue(b1)
|
||||
|
@ -4643,6 +4674,10 @@ public class TestCapacityScheduler {
|
|||
try {
|
||||
cs.reinitialize(conf, mockContext);
|
||||
} catch (IOException e) {
|
||||
LOG.error(
|
||||
"Expected to NOT throw exception when refresh queue tries to delete"
|
||||
+ " a queue WITHOUT running apps",
|
||||
e);
|
||||
fail("Expected to NOT throw exception when refresh queue tries to delete"
|
||||
+ " a queue WITHOUT running apps");
|
||||
}
|
||||
|
@ -4712,6 +4747,83 @@ public class TestCapacityScheduler {
|
|||
cs.stop();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test for all child queue deletion and thus making parent queue a child.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testRefreshQueuesWithAllChildQueuesDeleted() throws Exception {
|
||||
CapacityScheduler cs = new CapacityScheduler();
|
||||
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||
RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
|
||||
null, new RMContainerTokenSecretManager(conf),
|
||||
new NMTokenSecretManagerInRM(conf),
|
||||
new ClientToAMTokenSecretManagerInRM(), null);
|
||||
setupQueueConfiguration(conf);
|
||||
cs.setConf(new YarnConfiguration());
|
||||
cs.setRMContext(resourceManager.getRMContext());
|
||||
cs.init(conf);
|
||||
cs.start();
|
||||
cs.reinitialize(conf, rmContext);
|
||||
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
|
||||
|
||||
// test delete all leaf queues when there is no application running.
|
||||
Map<String, CSQueue> queues =
|
||||
cs.getCapacitySchedulerQueueManager().getQueues();
|
||||
|
||||
CSQueue bQueue = Mockito.spy((LeafQueue) queues.get("b1"));
|
||||
when(bQueue.getState()).thenReturn(QueueState.RUNNING)
|
||||
.thenReturn(QueueState.STOPPED);
|
||||
queues.put("b1", bQueue);
|
||||
|
||||
bQueue = Mockito.spy((LeafQueue) queues.get("b2"));
|
||||
when(bQueue.getState()).thenReturn(QueueState.STOPPED);
|
||||
queues.put("b2", bQueue);
|
||||
|
||||
bQueue = Mockito.spy((LeafQueue) queues.get("b3"));
|
||||
when(bQueue.getState()).thenReturn(QueueState.STOPPED);
|
||||
queues.put("b3", bQueue);
|
||||
|
||||
conf = new CapacitySchedulerConfiguration();
|
||||
setupQueueConfWithOutChildrenOfB(conf);
|
||||
|
||||
// test convert parent queue to leaf queue(root.b) when there is no
|
||||
// application running.
|
||||
try {
|
||||
cs.reinitialize(conf, mockContext);
|
||||
fail("Expected to throw exception when refresh queue tries to make parent"
|
||||
+ " queue a child queue when one of its children is still running.");
|
||||
} catch (IOException e) {
|
||||
//do not do anything, expected exception
|
||||
}
|
||||
|
||||
// test delete leaf queues(root.b.b1,b2,b3) when there is no application
|
||||
// running.
|
||||
try {
|
||||
cs.reinitialize(conf, mockContext);
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
fail("Expected to NOT throw exception when refresh queue tries to delete"
|
||||
+ " all children of a parent queue(without running apps).");
|
||||
}
|
||||
CSQueue rootQueue = cs.getRootQueue();
|
||||
CSQueue queueB = findQueue(rootQueue, B);
|
||||
assertNotNull("Parent Queue B should not be deleted", queueB);
|
||||
Assert.assertTrue("As Queue'B children are not deleted",
|
||||
queueB instanceof LeafQueue);
|
||||
|
||||
String message =
|
||||
"Refresh needs to support delete of all children of Parent queue.";
|
||||
assertNull(message,
|
||||
cs.getCapacitySchedulerQueueManager().getQueues().get("b3"));
|
||||
assertNull(message,
|
||||
cs.getCapacitySchedulerQueueManager().getQueues().get("b1"));
|
||||
assertNull(message,
|
||||
cs.getCapacitySchedulerQueueManager().getQueues().get("b2"));
|
||||
|
||||
cs.stop();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test if we can convert a leaf queue to a parent queue
|
||||
* @throws Exception
|
||||
|
|
Loading…
Reference in New Issue