YARN-1582. Capacity Scheduler: add a maximum-allocation-mb setting per queue. Contributed by Thomas Graves
(cherry picked from commit 69c8a7f45b
)
This commit is contained in:
parent
67002a5fc0
commit
ebdd88ec81
|
@ -211,6 +211,9 @@ Release 2.7.0 - UNRELEASED
|
|||
YARN-3123. Made YARN CLI show a single completed container even if the app
|
||||
is running. (Naganarasimha G R via zjshen)
|
||||
|
||||
YARN-1582. Capacity Scheduler: add a maximum-allocation-mb setting per
|
||||
queue (Thomas Graves via jlowe)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -285,7 +285,7 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
RegisterApplicationMasterResponse response = recordFactory
|
||||
.newRecordInstance(RegisterApplicationMasterResponse.class);
|
||||
response.setMaximumResourceCapability(rScheduler
|
||||
.getMaximumResourceCapability());
|
||||
.getMaximumResourceCapability(app.getQueue()));
|
||||
response.setApplicationACLs(app.getRMAppAttempt(applicationAttemptId)
|
||||
.getSubmissionContext().getAMContainerSpec().getApplicationACLs());
|
||||
response.setQueue(app.getQueue());
|
||||
|
|
|
@ -180,6 +180,11 @@ public abstract class AbstractYarnScheduler
|
|||
return maxResource;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource getMaximumResourceCapability(String queueName) {
|
||||
return getMaximumResourceCapability();
|
||||
}
|
||||
|
||||
protected void initMaximumResourceCapability(Resource maximumAllocation) {
|
||||
maxAllocWriteLock.lock();
|
||||
try {
|
||||
|
@ -635,4 +640,22 @@ public abstract class AbstractYarnScheduler
|
|||
maxAllocWriteLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
protected void refreshMaximumAllocation(Resource newMaxAlloc) {
|
||||
maxAllocWriteLock.lock();
|
||||
try {
|
||||
configuredMaximumAllocation = Resources.clone(newMaxAlloc);
|
||||
int maxMemory = newMaxAlloc.getMemory();
|
||||
if (maxNodeMemory != -1) {
|
||||
maxMemory = Math.min(maxMemory, maxNodeMemory);
|
||||
}
|
||||
int maxVcores = newMaxAlloc.getVirtualCores();
|
||||
if (maxNodeVCores != -1) {
|
||||
maxVcores = Math.min(maxVcores, maxNodeVCores);
|
||||
}
|
||||
maximumAllocation = Resources.createResource(maxMemory, maxVcores);
|
||||
} finally {
|
||||
maxAllocWriteLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -92,13 +92,22 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
|
|||
public Resource getMinimumResourceCapability();
|
||||
|
||||
/**
|
||||
* Get maximum allocatable {@link Resource}.
|
||||
* Get maximum allocatable {@link Resource} at the cluster level.
|
||||
* @return maximum allocatable resource
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public Resource getMaximumResourceCapability();
|
||||
|
||||
/**
|
||||
* Get maximum allocatable {@link Resource} for the queue specified.
|
||||
* @param queueName queue name
|
||||
* @return maximum allocatable resource
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public Resource getMaximumResourceCapability(String queueName);
|
||||
|
||||
@LimitedPrivate("yarn")
|
||||
@Evolving
|
||||
ResourceCalculator getResourceCalculator();
|
||||
|
|
|
@ -57,7 +57,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
volatile int numContainers;
|
||||
|
||||
final Resource minimumAllocation;
|
||||
final Resource maximumAllocation;
|
||||
Resource maximumAllocation;
|
||||
QueueState state;
|
||||
final QueueMetrics metrics;
|
||||
|
||||
|
@ -255,7 +255,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
Set<String> labels, String defaultLabelExpression,
|
||||
Map<String, Float> nodeLabelCapacities,
|
||||
Map<String, Float> maximumNodeLabelCapacities,
|
||||
boolean reservationContinueLooking)
|
||||
boolean reservationContinueLooking, Resource maxAllocation)
|
||||
throws IOException {
|
||||
// Sanity check
|
||||
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
|
||||
|
@ -326,6 +326,8 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
this.reservationsContinueLooking = reservationContinueLooking;
|
||||
|
||||
this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this);
|
||||
|
||||
this.maximumAllocation = maxAllocation;
|
||||
}
|
||||
|
||||
protected QueueInfo getQueueInfo() {
|
||||
|
@ -341,7 +343,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
}
|
||||
|
||||
@Private
|
||||
public Resource getMaximumAllocation() {
|
||||
public synchronized Resource getMaximumAllocation() {
|
||||
return maximumAllocation;
|
||||
}
|
||||
|
||||
|
|
|
@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
|
|||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
@ -356,9 +357,11 @@ public class CapacityScheduler extends
|
|||
validateConf(this.conf);
|
||||
try {
|
||||
LOG.info("Re-initializing queues...");
|
||||
refreshMaximumAllocation(this.conf.getMaximumAllocation());
|
||||
reinitializeQueues(this.conf);
|
||||
} catch (Throwable t) {
|
||||
this.conf = oldConf;
|
||||
refreshMaximumAllocation(this.conf.getMaximumAllocation());
|
||||
throw new IOException("Failed to re-init queues", t);
|
||||
}
|
||||
}
|
||||
|
@ -1580,6 +1583,20 @@ public class CapacityScheduler extends
|
|||
.of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource getMaximumResourceCapability(String queueName) {
|
||||
CSQueue queue = getQueue(queueName);
|
||||
if (queue == null) {
|
||||
LOG.error("Unknown queue: " + queueName);
|
||||
return getMaximumResourceCapability();
|
||||
}
|
||||
if (!(queue instanceof LeafQueue)) {
|
||||
LOG.error("queue " + queueName + " is not an leaf queue");
|
||||
return getMaximumResourceCapability();
|
||||
}
|
||||
return ((LeafQueue)queue).getMaximumAllocation();
|
||||
}
|
||||
|
||||
private String handleMoveToPlanQueue(String targetQueueName) {
|
||||
CSQueue dest = getQueue(targetQueueName);
|
||||
if (dest != null && dest instanceof PlanQueue) {
|
||||
|
|
|
@ -108,6 +108,13 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|||
@Private
|
||||
public static final boolean DEFAULT_RESERVE_CONT_LOOK_ALL_NODES = true;
|
||||
|
||||
@Private
|
||||
public static final String MAXIMUM_ALLOCATION_MB = "maximum-allocation-mb";
|
||||
|
||||
@Private
|
||||
public static final String MAXIMUM_ALLOCATION_VCORES =
|
||||
"maximum-allocation-vcores";
|
||||
|
||||
@Private
|
||||
public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000;
|
||||
|
||||
|
@ -580,6 +587,48 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|||
return Resources.createResource(maximumMemory, maximumCores);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the per queue setting for the maximum limit to allocate to
|
||||
* each container request.
|
||||
*
|
||||
* @param queue
|
||||
* name of the queue
|
||||
* @return setting specified per queue else falls back to the cluster setting
|
||||
*/
|
||||
public Resource getMaximumAllocationPerQueue(String queue) {
|
||||
String queuePrefix = getQueuePrefix(queue);
|
||||
int maxAllocationMbPerQueue = getInt(queuePrefix + MAXIMUM_ALLOCATION_MB,
|
||||
(int)UNDEFINED);
|
||||
int maxAllocationVcoresPerQueue = getInt(
|
||||
queuePrefix + MAXIMUM_ALLOCATION_VCORES, (int)UNDEFINED);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("max alloc mb per queue for " + queue + " is "
|
||||
+ maxAllocationMbPerQueue);
|
||||
LOG.debug("max alloc vcores per queue for " + queue + " is "
|
||||
+ maxAllocationVcoresPerQueue);
|
||||
}
|
||||
Resource clusterMax = getMaximumAllocation();
|
||||
if (maxAllocationMbPerQueue == (int)UNDEFINED) {
|
||||
LOG.info("max alloc mb per queue for " + queue + " is undefined");
|
||||
maxAllocationMbPerQueue = clusterMax.getMemory();
|
||||
}
|
||||
if (maxAllocationVcoresPerQueue == (int)UNDEFINED) {
|
||||
LOG.info("max alloc vcore per queue for " + queue + " is undefined");
|
||||
maxAllocationVcoresPerQueue = clusterMax.getVirtualCores();
|
||||
}
|
||||
Resource result = Resources.createResource(maxAllocationMbPerQueue,
|
||||
maxAllocationVcoresPerQueue);
|
||||
if (maxAllocationMbPerQueue > clusterMax.getMemory()
|
||||
|| maxAllocationVcoresPerQueue > clusterMax.getVirtualCores()) {
|
||||
throw new IllegalArgumentException(
|
||||
"Queue maximum allocation cannot be larger than the cluster setting"
|
||||
+ " for queue " + queue
|
||||
+ " max allocation per queue: " + result
|
||||
+ " cluster setting: " + clusterMax);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public boolean getEnableUserMetrics() {
|
||||
return getBoolean(ENABLE_USER_METRICS, DEFAULT_ENABLE_USER_METRICS);
|
||||
}
|
||||
|
|
|
@ -39,6 +39,8 @@ public interface CapacitySchedulerContext {
|
|||
|
||||
Resource getMaximumResourceCapability();
|
||||
|
||||
Resource getMaximumResourceCapability(String queueName);
|
||||
|
||||
RMContainerTokenSecretManager getContainerTokenSecretManager();
|
||||
|
||||
int getNumClusterNodes();
|
||||
|
|
|
@ -97,7 +97,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
|
||||
Set<FiCaSchedulerApp> pendingApplications;
|
||||
|
||||
private final float minimumAllocationFactor;
|
||||
private float minimumAllocationFactor;
|
||||
|
||||
private Map<String, User> users = new HashMap<String, User>();
|
||||
|
||||
|
@ -162,7 +162,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
state, acls, cs.getConfiguration().getNodeLocalityDelay(), accessibleLabels,
|
||||
defaultLabelExpression, this.capacitiyByNodeLabels,
|
||||
this.maxCapacityByNodeLabels,
|
||||
cs.getConfiguration().getReservationContinueLook());
|
||||
cs.getConfiguration().getReservationContinueLook(),
|
||||
cs.getConfiguration().getMaximumAllocationPerQueue(getQueuePath()));
|
||||
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("LeafQueue:" + " name=" + queueName
|
||||
|
@ -192,11 +193,12 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
Set<String> labels, String defaultLabelExpression,
|
||||
Map<String, Float> capacitieByLabel,
|
||||
Map<String, Float> maximumCapacitiesByLabel,
|
||||
boolean revervationContinueLooking) throws IOException {
|
||||
boolean revervationContinueLooking,
|
||||
Resource maxAllocation) throws IOException {
|
||||
super.setupQueueConfigs(clusterResource, capacity, absoluteCapacity,
|
||||
maximumCapacity, absoluteMaxCapacity, state, acls, labels,
|
||||
defaultLabelExpression, capacitieByLabel, maximumCapacitiesByLabel,
|
||||
revervationContinueLooking);
|
||||
revervationContinueLooking, maxAllocation);
|
||||
// Sanity check
|
||||
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
|
||||
float absCapacity = getParent().getAbsoluteCapacity() * capacity;
|
||||
|
@ -238,6 +240,12 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
|
||||
this.nodeLocalityDelay = nodeLocalityDelay;
|
||||
|
||||
// re-init this since max allocation could have changed
|
||||
this.minimumAllocationFactor =
|
||||
Resources.ratio(resourceCalculator,
|
||||
Resources.subtract(maximumAllocation, minimumAllocation),
|
||||
maximumAllocation);
|
||||
|
||||
StringBuilder aclsString = new StringBuilder();
|
||||
for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
|
||||
aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
|
||||
|
@ -283,6 +291,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
"minimumAllocationFactor = " + minimumAllocationFactor +
|
||||
" [= (float)(maximumAllocationMemory - minimumAllocationMemory) / " +
|
||||
"maximumAllocationMemory ]" + "\n" +
|
||||
"maximumAllocation = " + maximumAllocation +
|
||||
" [= configuredMaxAllocation ]" + "\n" +
|
||||
"numContainers = " + numContainers +
|
||||
" [= currentNumContainers ]" + "\n" +
|
||||
"state = " + state +
|
||||
|
@ -479,6 +489,21 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
}
|
||||
|
||||
LeafQueue newlyParsedLeafQueue = (LeafQueue)newlyParsedQueue;
|
||||
|
||||
// don't allow the maximum allocation to be decreased in size
|
||||
// since we have already told running AM's the size
|
||||
Resource oldMax = getMaximumAllocation();
|
||||
Resource newMax = newlyParsedLeafQueue.getMaximumAllocation();
|
||||
if (newMax.getMemory() < oldMax.getMemory()
|
||||
|| newMax.getVirtualCores() < oldMax.getVirtualCores()) {
|
||||
throw new IOException(
|
||||
"Trying to reinitialize "
|
||||
+ getQueuePath()
|
||||
+ " the maximum allocation size can not be decreased!"
|
||||
+ " Current setting: " + oldMax
|
||||
+ ", trying to set it to: " + newMax);
|
||||
}
|
||||
|
||||
setupQueueConfigs(
|
||||
clusterResource,
|
||||
newlyParsedLeafQueue.capacity, newlyParsedLeafQueue.absoluteCapacity,
|
||||
|
@ -494,7 +519,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
newlyParsedLeafQueue.defaultLabelExpression,
|
||||
newlyParsedLeafQueue.capacitiyByNodeLabels,
|
||||
newlyParsedLeafQueue.maxCapacityByNodeLabels,
|
||||
newlyParsedLeafQueue.reservationsContinueLooking);
|
||||
newlyParsedLeafQueue.reservationsContinueLooking,
|
||||
newlyParsedLeafQueue.getMaximumAllocation());
|
||||
|
||||
// queue metrics are updated, more resource may be available
|
||||
// activate the pending applications if possible
|
||||
|
|
|
@ -132,7 +132,7 @@ public class ParentQueue extends AbstractCSQueue {
|
|||
super.setupQueueConfigs(clusterResource, capacity, absoluteCapacity,
|
||||
maximumCapacity, absoluteMaxCapacity, state, acls, accessibleLabels,
|
||||
defaultLabelExpression, nodeLabelCapacities, maximumCapacitiesByLabel,
|
||||
reservationContinueLooking);
|
||||
reservationContinueLooking, maximumAllocation);
|
||||
StringBuilder aclsString = new StringBuilder();
|
||||
for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
|
||||
aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
|
||||
|
|
|
@ -500,9 +500,12 @@ public class TestCapacityScheduler {
|
|||
public void testParseQueue() throws IOException {
|
||||
CapacityScheduler cs = new CapacityScheduler();
|
||||
cs.setConf(new YarnConfiguration());
|
||||
|
||||
cs.setRMContext(resourceManager.getRMContext());
|
||||
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||
setupQueueConfiguration(conf);
|
||||
cs.init(conf);
|
||||
cs.start();
|
||||
|
||||
conf.setQueues(CapacitySchedulerConfiguration.ROOT + ".a.a1", new String[] {"b1"} );
|
||||
conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100.0f);
|
||||
conf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100.0f);
|
||||
|
@ -2124,4 +2127,301 @@ public class TestCapacityScheduler {
|
|||
assertFalse("queue " + B2 + " should have been preemptable",
|
||||
queueB2.getPreemptionDisabled());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshQueuesMaxAllocationRefresh() throws Exception {
|
||||
// queue refresh should not allow changing the maximum allocation setting
|
||||
// per queue to be smaller than previous setting
|
||||
CapacityScheduler cs = new CapacityScheduler();
|
||||
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||
setupQueueConfiguration(conf);
|
||||
cs.setConf(new YarnConfiguration());
|
||||
cs.setRMContext(resourceManager.getRMContext());
|
||||
cs.init(conf);
|
||||
cs.start();
|
||||
cs.reinitialize(conf, mockContext);
|
||||
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
|
||||
|
||||
assertEquals("max allocation in CS",
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
||||
cs.getMaximumResourceCapability().getMemory());
|
||||
assertEquals("max allocation for A1",
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
||||
conf.getMaximumAllocationPerQueue(A1).getMemory());
|
||||
assertEquals("max allocation",
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
||||
conf.getMaximumAllocation().getMemory());
|
||||
|
||||
CSQueue rootQueue = cs.getRootQueue();
|
||||
CSQueue queueA = findQueue(rootQueue, A);
|
||||
CSQueue queueA1 = findQueue(queueA, A1);
|
||||
assertEquals("queue max allocation", ((LeafQueue) queueA1)
|
||||
.getMaximumAllocation().getMemory(), 8192);
|
||||
|
||||
setMaxAllocMb(conf, A1, 4096);
|
||||
|
||||
try {
|
||||
cs.reinitialize(conf, mockContext);
|
||||
fail("should have thrown exception");
|
||||
} catch (IOException e) {
|
||||
assertTrue("max allocation exception",
|
||||
e.getCause().toString().contains("not be decreased"));
|
||||
}
|
||||
|
||||
setMaxAllocMb(conf, A1, 8192);
|
||||
cs.reinitialize(conf, mockContext);
|
||||
|
||||
setMaxAllocVcores(conf, A1,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES - 1);
|
||||
try {
|
||||
cs.reinitialize(conf, mockContext);
|
||||
fail("should have thrown exception");
|
||||
} catch (IOException e) {
|
||||
assertTrue("max allocation exception",
|
||||
e.getCause().toString().contains("not be decreased"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshQueuesMaxAllocationPerQueueLarge() throws Exception {
|
||||
// verify we can't set the allocation per queue larger then cluster setting
|
||||
CapacityScheduler cs = new CapacityScheduler();
|
||||
cs.setConf(new YarnConfiguration());
|
||||
cs.setRMContext(resourceManager.getRMContext());
|
||||
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||
setupQueueConfiguration(conf);
|
||||
cs.init(conf);
|
||||
cs.start();
|
||||
// change max allocation for B3 queue to be larger then cluster max
|
||||
setMaxAllocMb(conf, B3,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 2048);
|
||||
try {
|
||||
cs.reinitialize(conf, mockContext);
|
||||
fail("should have thrown exception");
|
||||
} catch (IOException e) {
|
||||
assertTrue("maximum allocation exception",
|
||||
e.getCause().getMessage().contains("maximum allocation"));
|
||||
}
|
||||
|
||||
setMaxAllocMb(conf, B3,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
|
||||
cs.reinitialize(conf, mockContext);
|
||||
|
||||
setMaxAllocVcores(conf, B3,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES + 1);
|
||||
try {
|
||||
cs.reinitialize(conf, mockContext);
|
||||
fail("should have thrown exception");
|
||||
} catch (IOException e) {
|
||||
assertTrue("maximum allocation exception",
|
||||
e.getCause().getMessage().contains("maximum allocation"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshQueuesMaxAllocationRefreshLarger() throws Exception {
|
||||
// queue refresh should allow max allocation per queue to go larger
|
||||
CapacityScheduler cs = new CapacityScheduler();
|
||||
cs.setConf(new YarnConfiguration());
|
||||
cs.setRMContext(resourceManager.getRMContext());
|
||||
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||
setupQueueConfiguration(conf);
|
||||
setMaxAllocMb(conf,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
|
||||
setMaxAllocVcores(conf,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
|
||||
setMaxAllocMb(conf, A1, 4096);
|
||||
setMaxAllocVcores(conf, A1, 2);
|
||||
cs.init(conf);
|
||||
cs.start();
|
||||
cs.reinitialize(conf, mockContext);
|
||||
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
|
||||
|
||||
assertEquals("max capability MB in CS",
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
||||
cs.getMaximumResourceCapability().getMemory());
|
||||
assertEquals("max capability vcores in CS",
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
|
||||
cs.getMaximumResourceCapability().getVirtualCores());
|
||||
assertEquals("max allocation MB A1",
|
||||
4096,
|
||||
conf.getMaximumAllocationPerQueue(A1).getMemory());
|
||||
assertEquals("max allocation vcores A1",
|
||||
2,
|
||||
conf.getMaximumAllocationPerQueue(A1).getVirtualCores());
|
||||
assertEquals("cluster max allocation MB",
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
||||
conf.getMaximumAllocation().getMemory());
|
||||
assertEquals("cluster max allocation vcores",
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
|
||||
conf.getMaximumAllocation().getVirtualCores());
|
||||
|
||||
CSQueue rootQueue = cs.getRootQueue();
|
||||
CSQueue queueA = findQueue(rootQueue, A);
|
||||
CSQueue queueA1 = findQueue(queueA, A1);
|
||||
assertEquals("queue max allocation", ((LeafQueue) queueA1)
|
||||
.getMaximumAllocation().getMemory(), 4096);
|
||||
|
||||
setMaxAllocMb(conf, A1, 6144);
|
||||
setMaxAllocVcores(conf, A1, 3);
|
||||
cs.reinitialize(conf, null);
|
||||
// conf will have changed but we shouldn't be able to change max allocation
|
||||
// for the actual queue
|
||||
assertEquals("max allocation MB A1", 6144,
|
||||
conf.getMaximumAllocationPerQueue(A1).getMemory());
|
||||
assertEquals("max allocation vcores A1", 3,
|
||||
conf.getMaximumAllocationPerQueue(A1).getVirtualCores());
|
||||
assertEquals("max allocation MB cluster",
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
||||
conf.getMaximumAllocation().getMemory());
|
||||
assertEquals("max allocation vcores cluster",
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
|
||||
conf.getMaximumAllocation().getVirtualCores());
|
||||
assertEquals("queue max allocation MB", 6144,
|
||||
((LeafQueue) queueA1).getMaximumAllocation().getMemory());
|
||||
assertEquals("queue max allocation vcores", 3,
|
||||
((LeafQueue) queueA1).getMaximumAllocation().getVirtualCores());
|
||||
assertEquals("max capability MB cluster",
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
||||
cs.getMaximumResourceCapability().getMemory());
|
||||
assertEquals("cluster max capability vcores",
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
|
||||
cs.getMaximumResourceCapability().getVirtualCores());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshQueuesMaxAllocationCSError() throws Exception {
|
||||
// Try to refresh the cluster level max allocation size to be smaller
|
||||
// and it should error out
|
||||
CapacityScheduler cs = new CapacityScheduler();
|
||||
cs.setConf(new YarnConfiguration());
|
||||
cs.setRMContext(resourceManager.getRMContext());
|
||||
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||
setupQueueConfiguration(conf);
|
||||
setMaxAllocMb(conf, 10240);
|
||||
setMaxAllocVcores(conf, 10);
|
||||
setMaxAllocMb(conf, A1, 4096);
|
||||
setMaxAllocVcores(conf, A1, 4);
|
||||
cs.init(conf);
|
||||
cs.start();
|
||||
cs.reinitialize(conf, mockContext);
|
||||
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
|
||||
|
||||
assertEquals("max allocation MB in CS", 10240,
|
||||
cs.getMaximumResourceCapability().getMemory());
|
||||
assertEquals("max allocation vcores in CS", 10,
|
||||
cs.getMaximumResourceCapability().getVirtualCores());
|
||||
|
||||
setMaxAllocMb(conf, 6144);
|
||||
try {
|
||||
cs.reinitialize(conf, mockContext);
|
||||
fail("should have thrown exception");
|
||||
} catch (IOException e) {
|
||||
assertTrue("max allocation exception",
|
||||
e.getCause().toString().contains("not be decreased"));
|
||||
}
|
||||
|
||||
setMaxAllocMb(conf, 10240);
|
||||
cs.reinitialize(conf, mockContext);
|
||||
|
||||
setMaxAllocVcores(conf, 8);
|
||||
try {
|
||||
cs.reinitialize(conf, mockContext);
|
||||
fail("should have thrown exception");
|
||||
} catch (IOException e) {
|
||||
assertTrue("max allocation exception",
|
||||
e.getCause().toString().contains("not be decreased"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshQueuesMaxAllocationCSLarger() throws Exception {
|
||||
// Try to refresh the cluster level max allocation size to be larger
|
||||
// and verify that if there is no setting per queue it uses the
|
||||
// cluster level setting.
|
||||
CapacityScheduler cs = new CapacityScheduler();
|
||||
cs.setConf(new YarnConfiguration());
|
||||
cs.setRMContext(resourceManager.getRMContext());
|
||||
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||
setupQueueConfiguration(conf);
|
||||
setMaxAllocMb(conf, 10240);
|
||||
setMaxAllocVcores(conf, 10);
|
||||
setMaxAllocMb(conf, A1, 4096);
|
||||
setMaxAllocVcores(conf, A1, 4);
|
||||
cs.init(conf);
|
||||
cs.start();
|
||||
cs.reinitialize(conf, mockContext);
|
||||
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
|
||||
|
||||
assertEquals("max allocation MB in CS", 10240,
|
||||
cs.getMaximumResourceCapability().getMemory());
|
||||
assertEquals("max allocation vcores in CS", 10,
|
||||
cs.getMaximumResourceCapability().getVirtualCores());
|
||||
|
||||
CSQueue rootQueue = cs.getRootQueue();
|
||||
CSQueue queueA = findQueue(rootQueue, A);
|
||||
CSQueue queueB = findQueue(rootQueue, B);
|
||||
CSQueue queueA1 = findQueue(queueA, A1);
|
||||
CSQueue queueA2 = findQueue(queueA, A2);
|
||||
CSQueue queueB2 = findQueue(queueB, B2);
|
||||
|
||||
assertEquals("queue A1 max allocation MB", 4096,
|
||||
((LeafQueue) queueA1).getMaximumAllocation().getMemory());
|
||||
assertEquals("queue A1 max allocation vcores", 4,
|
||||
((LeafQueue) queueA1).getMaximumAllocation().getVirtualCores());
|
||||
assertEquals("queue A2 max allocation MB", 10240,
|
||||
((LeafQueue) queueA2).getMaximumAllocation().getMemory());
|
||||
assertEquals("queue A2 max allocation vcores", 10,
|
||||
((LeafQueue) queueA2).getMaximumAllocation().getVirtualCores());
|
||||
assertEquals("queue B2 max allocation MB", 10240,
|
||||
((LeafQueue) queueB2).getMaximumAllocation().getMemory());
|
||||
assertEquals("queue B2 max allocation vcores", 10,
|
||||
((LeafQueue) queueB2).getMaximumAllocation().getVirtualCores());
|
||||
|
||||
setMaxAllocMb(conf, 12288);
|
||||
setMaxAllocVcores(conf, 12);
|
||||
cs.reinitialize(conf, null);
|
||||
// cluster level setting should change and any queues without
|
||||
// per queue setting
|
||||
assertEquals("max allocation MB in CS", 12288,
|
||||
cs.getMaximumResourceCapability().getMemory());
|
||||
assertEquals("max allocation vcores in CS", 12,
|
||||
cs.getMaximumResourceCapability().getVirtualCores());
|
||||
assertEquals("queue A1 max MB allocation", 4096,
|
||||
((LeafQueue) queueA1).getMaximumAllocation().getMemory());
|
||||
assertEquals("queue A1 max vcores allocation", 4,
|
||||
((LeafQueue) queueA1).getMaximumAllocation().getVirtualCores());
|
||||
assertEquals("queue A2 max MB allocation", 12288,
|
||||
((LeafQueue) queueA2).getMaximumAllocation().getMemory());
|
||||
assertEquals("queue A2 max vcores allocation", 12,
|
||||
((LeafQueue) queueA2).getMaximumAllocation().getVirtualCores());
|
||||
assertEquals("queue B2 max MB allocation", 12288,
|
||||
((LeafQueue) queueB2).getMaximumAllocation().getMemory());
|
||||
assertEquals("queue B2 max vcores allocation", 12,
|
||||
((LeafQueue) queueB2).getMaximumAllocation().getVirtualCores());
|
||||
}
|
||||
|
||||
private void setMaxAllocMb(Configuration conf, int maxAllocMb) {
|
||||
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
||||
maxAllocMb);
|
||||
}
|
||||
|
||||
private void setMaxAllocMb(CapacitySchedulerConfiguration conf,
|
||||
String queueName, int maxAllocMb) {
|
||||
String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName)
|
||||
+ CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_MB;
|
||||
conf.setInt(propName, maxAllocMb);
|
||||
}
|
||||
|
||||
private void setMaxAllocVcores(Configuration conf, int maxAllocVcores) {
|
||||
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
|
||||
maxAllocVcores);
|
||||
}
|
||||
|
||||
private void setMaxAllocVcores(CapacitySchedulerConfiguration conf,
|
||||
String queueName, int maxAllocVcores) {
|
||||
String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName)
|
||||
+ CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_VCORES;
|
||||
conf.setInt(propName, maxAllocVcores);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -226,6 +226,18 @@ Hadoop MapReduce Next Generation - Capacity Scheduler
|
|||
| | ensures that a single user can never take more than the queue's configured |
|
||||
| | capacity irrespective of how idle th cluster is. Value is specified as |
|
||||
| | a float.|
|
||||
*--------------------------------------+--------------------------------------+
|
||||
| <<<yarn.scheduler.capacity.<queue-path>.maximum-allocation-mb>>> | |
|
||||
| | The per queue maximum limit of memory to allocate to each container |
|
||||
| | request at the Resource Manager. This setting overrides the cluster |
|
||||
| | configuration <<<yarn.scheduler.maximum-allocation-mb>>>. This value |
|
||||
| | must be smaller than or equal to the cluster maximum. |
|
||||
*--------------------------------------+--------------------------------------+
|
||||
| <<<yarn.scheduler.capacity.<queue-path>.maximum-allocation-vcores>>> | |
|
||||
| | The per queue maximum limit of virtual cores to allocate to each container |
|
||||
| | request at the Resource Manager. This setting overrides the cluster |
|
||||
| | configuration <<<yarn.scheduler.maximum-allocation-vcores>>>. This value |
|
||||
| | must be smaller than or equal to the cluster maximum. |
|
||||
*--------------------------------------+--------------------------------------+
|
||||
|
||||
* Running and Pending Application Limits
|
||||
|
|
Loading…
Reference in New Issue