YARN-6123. [YARN-5864] Add a test to make sure queues of orderingPolicy will be updated when childQueues is added or removed. Contributed by Wangda Tan.
This commit is contained in:
parent
2034315763
commit
165f07f51a
@ -343,6 +343,9 @@ public void reinitialize(CSQueue newlyParsedQueue,
|
|||||||
// Re-sort all queues
|
// Re-sort all queues
|
||||||
childQueues.clear();
|
childQueues.clear();
|
||||||
childQueues.addAll(currentChildQueues.values());
|
childQueues.addAll(currentChildQueues.values());
|
||||||
|
|
||||||
|
// Make sure we notifies QueueOrderingPolicy
|
||||||
|
queueOrderingPolicy.setQueues(childQueues);
|
||||||
} finally {
|
} finally {
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
}
|
}
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
||||||
@ -183,4 +184,9 @@ public String getConfigName() {
|
|||||||
return CapacitySchedulerConfiguration.QUEUE_UTILIZATION_ORDERING_POLICY;
|
return CapacitySchedulerConfiguration.QUEUE_UTILIZATION_ORDERING_POLICY;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public List<CSQueue> getQueues() {
|
||||||
|
return queues;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -728,12 +728,12 @@ public void testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer()
|
|||||||
|
|
||||||
// Call editSchedule again: selected containers are killed
|
// Call editSchedule again: selected containers are killed
|
||||||
editPolicy.editSchedule();
|
editPolicy.editSchedule();
|
||||||
|
waitNumberOfLiveContainersFromApp(schedulerApp1, 4);
|
||||||
|
|
||||||
// Do allocation for all nms
|
// Make sure the container killed, then do allocation for all nms
|
||||||
for (int i = 0; i < 4; i++) {
|
for (int i = 0; i < 4; i++) {
|
||||||
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
|
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
|
||||||
}
|
}
|
||||||
waitNumberOfLiveContainersFromApp(schedulerApp1, 4);
|
|
||||||
|
|
||||||
waitNumberOfLiveContainersFromApp(schedulerApp1, 4);
|
waitNumberOfLiveContainersFromApp(schedulerApp1, 4);
|
||||||
waitNumberOfLiveContainersFromApp(schedulerApp2, 1);
|
waitNumberOfLiveContainersFromApp(schedulerApp2, 1);
|
||||||
|
@ -19,6 +19,9 @@
|
|||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
@ -31,6 +34,8 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
@ -936,4 +941,57 @@ public RMNodeLabelsManager createNodeLabelManager() {
|
|||||||
IOUtils.closeStream(rm);
|
IOUtils.closeStream(rm);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testQueueOrderingPolicyUpdatedAfterReinitialize()
|
||||||
|
throws IOException {
|
||||||
|
CapacitySchedulerConfiguration csConf =
|
||||||
|
new CapacitySchedulerConfiguration();
|
||||||
|
setupQueueConfigurationWithoutLabels(csConf);
|
||||||
|
YarnConfiguration conf = new YarnConfiguration(csConf);
|
||||||
|
|
||||||
|
CapacityScheduler capacityScheduler = new CapacityScheduler();
|
||||||
|
RMContextImpl rmContext =
|
||||||
|
new RMContextImpl(null, null, null, null, null, null,
|
||||||
|
new RMContainerTokenSecretManager(conf),
|
||||||
|
new NMTokenSecretManagerInRM(conf),
|
||||||
|
new ClientToAMTokenSecretManagerInRM(), null);
|
||||||
|
rmContext.setNodeLabelManager(nodeLabelManager);
|
||||||
|
capacityScheduler.setConf(conf);
|
||||||
|
capacityScheduler.setRMContext(rmContext);
|
||||||
|
capacityScheduler.init(conf);
|
||||||
|
capacityScheduler.start();
|
||||||
|
|
||||||
|
// Add a new b4 queue
|
||||||
|
csConf.setQueues(CapacitySchedulerConfiguration.ROOT + ".b",
|
||||||
|
new String[] { "b1", "b2", "b3", "b4" });
|
||||||
|
csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".b.b4", 0f);
|
||||||
|
ParentQueue bQ = (ParentQueue) capacityScheduler.getQueue("b");
|
||||||
|
checkEqualsToQueueSet(bQ.getChildQueues(),
|
||||||
|
new String[] { "b1", "b2", "b3" });
|
||||||
|
capacityScheduler.reinitialize(new YarnConfiguration(csConf), rmContext);
|
||||||
|
|
||||||
|
// Check child queue of b
|
||||||
|
checkEqualsToQueueSet(bQ.getChildQueues(),
|
||||||
|
new String[] { "b1", "b2", "b3", "b4" });
|
||||||
|
|
||||||
|
PriorityUtilizationQueueOrderingPolicy queueOrderingPolicy =
|
||||||
|
(PriorityUtilizationQueueOrderingPolicy) bQ.getQueueOrderingPolicy();
|
||||||
|
checkEqualsToQueueSet(queueOrderingPolicy.getQueues(),
|
||||||
|
new String[] { "b1", "b2", "b3", "b4" });
|
||||||
|
|
||||||
|
ServiceOperations.stopQuietly(capacityScheduler);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkEqualsToQueueSet(List<CSQueue> queues, String[] queueNames) {
|
||||||
|
Set<String> existedQueues = new HashSet<>();
|
||||||
|
for (CSQueue q : queues) {
|
||||||
|
existedQueues.add(q.getQueueName());
|
||||||
|
}
|
||||||
|
for (String q : queueNames) {
|
||||||
|
Assert.assertTrue(existedQueues.remove(q));
|
||||||
|
}
|
||||||
|
Assert.assertTrue(existedQueues.isEmpty());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user