YARN-6123. [YARN-5864] Add a test to make sure queues of orderingPolicy will be updated when childQueues is added or removed. Contributed by Wangda Tan.
This commit is contained in:
parent
6cd39803ba
commit
68b08e96a0
|
@ -343,6 +343,9 @@ public class ParentQueue extends AbstractCSQueue {
|
|||
// Re-sort all queues
|
||||
childQueues.clear();
|
||||
childQueues.addAll(currentChildQueues.values());
|
||||
|
||||
// Make sure we notifies QueueOrderingPolicy
|
||||
queueOrderingPolicy.setQueues(childQueues);
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
||||
|
@ -182,4 +183,9 @@ public class PriorityUtilizationQueueOrderingPolicy implements QueueOrderingPoli
|
|||
return CapacitySchedulerConfiguration.QUEUE_UTILIZATION_ORDERING_POLICY;
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public List<CSQueue> getQueues() {
|
||||
return queues;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -728,12 +728,12 @@ public class TestCapacitySchedulerSurgicalPreemption
|
|||
|
||||
// Call editSchedule again: selected containers are killed
|
||||
editPolicy.editSchedule();
|
||||
waitNumberOfLiveContainersFromApp(schedulerApp1, 4);
|
||||
|
||||
// Do allocation for all nms
|
||||
// Make sure the container killed, then do allocation for all nms
|
||||
for (int i = 0; i < 4; i++) {
|
||||
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
|
||||
}
|
||||
waitNumberOfLiveContainersFromApp(schedulerApp1, 4);
|
||||
|
||||
waitNumberOfLiveContainersFromApp(schedulerApp1, 4);
|
||||
waitNumberOfLiveContainersFromApp(schedulerApp2, 1);
|
||||
|
|
|
@ -19,6 +19,9 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -31,6 +34,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
|
@ -936,4 +941,57 @@ public class TestQueueParsing {
|
|||
IOUtils.closeStream(rm);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testQueueOrderingPolicyUpdatedAfterReinitialize()
|
||||
throws IOException {
|
||||
CapacitySchedulerConfiguration csConf =
|
||||
new CapacitySchedulerConfiguration();
|
||||
setupQueueConfigurationWithoutLabels(csConf);
|
||||
YarnConfiguration conf = new YarnConfiguration(csConf);
|
||||
|
||||
CapacityScheduler capacityScheduler = new CapacityScheduler();
|
||||
RMContextImpl rmContext =
|
||||
new RMContextImpl(null, null, null, null, null, null,
|
||||
new RMContainerTokenSecretManager(conf),
|
||||
new NMTokenSecretManagerInRM(conf),
|
||||
new ClientToAMTokenSecretManagerInRM(), null);
|
||||
rmContext.setNodeLabelManager(nodeLabelManager);
|
||||
capacityScheduler.setConf(conf);
|
||||
capacityScheduler.setRMContext(rmContext);
|
||||
capacityScheduler.init(conf);
|
||||
capacityScheduler.start();
|
||||
|
||||
// Add a new b4 queue
|
||||
csConf.setQueues(CapacitySchedulerConfiguration.ROOT + ".b",
|
||||
new String[] { "b1", "b2", "b3", "b4" });
|
||||
csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".b.b4", 0f);
|
||||
ParentQueue bQ = (ParentQueue) capacityScheduler.getQueue("b");
|
||||
checkEqualsToQueueSet(bQ.getChildQueues(),
|
||||
new String[] { "b1", "b2", "b3" });
|
||||
capacityScheduler.reinitialize(new YarnConfiguration(csConf), rmContext);
|
||||
|
||||
// Check child queue of b
|
||||
checkEqualsToQueueSet(bQ.getChildQueues(),
|
||||
new String[] { "b1", "b2", "b3", "b4" });
|
||||
|
||||
PriorityUtilizationQueueOrderingPolicy queueOrderingPolicy =
|
||||
(PriorityUtilizationQueueOrderingPolicy) bQ.getQueueOrderingPolicy();
|
||||
checkEqualsToQueueSet(queueOrderingPolicy.getQueues(),
|
||||
new String[] { "b1", "b2", "b3", "b4" });
|
||||
|
||||
ServiceOperations.stopQuietly(capacityScheduler);
|
||||
}
|
||||
|
||||
private void checkEqualsToQueueSet(List<CSQueue> queues, String[] queueNames) {
|
||||
Set<String> existedQueues = new HashSet<>();
|
||||
for (CSQueue q : queues) {
|
||||
existedQueues.add(q.getQueueName());
|
||||
}
|
||||
for (String q : queueNames) {
|
||||
Assert.assertTrue(existedQueues.remove(q));
|
||||
}
|
||||
Assert.assertTrue(existedQueues.isEmpty());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue