YARN-1582. Capacity Scheduler: add a maximum-allocation-mb setting per queue. Contributed by Thomas Graves
This commit is contained in:
parent
c4980a2f34
commit
69c8a7f45b
|
@ -245,6 +245,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
YARN-3123. Made YARN CLI show a single completed container even if the app
|
YARN-3123. Made YARN CLI show a single completed container even if the app
|
||||||
is running. (Naganarasimha G R via zjshen)
|
is running. (Naganarasimha G R via zjshen)
|
||||||
|
|
||||||
|
YARN-1582. Capacity Scheduler: add a maximum-allocation-mb setting per
|
||||||
|
queue (Thomas Graves via jlowe)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -285,7 +285,7 @@ public class ApplicationMasterService extends AbstractService implements
|
||||||
RegisterApplicationMasterResponse response = recordFactory
|
RegisterApplicationMasterResponse response = recordFactory
|
||||||
.newRecordInstance(RegisterApplicationMasterResponse.class);
|
.newRecordInstance(RegisterApplicationMasterResponse.class);
|
||||||
response.setMaximumResourceCapability(rScheduler
|
response.setMaximumResourceCapability(rScheduler
|
||||||
.getMaximumResourceCapability());
|
.getMaximumResourceCapability(app.getQueue()));
|
||||||
response.setApplicationACLs(app.getRMAppAttempt(applicationAttemptId)
|
response.setApplicationACLs(app.getRMAppAttempt(applicationAttemptId)
|
||||||
.getSubmissionContext().getAMContainerSpec().getApplicationACLs());
|
.getSubmissionContext().getAMContainerSpec().getApplicationACLs());
|
||||||
response.setQueue(app.getQueue());
|
response.setQueue(app.getQueue());
|
||||||
|
|
|
@ -180,6 +180,11 @@ public abstract class AbstractYarnScheduler
|
||||||
return maxResource;
|
return maxResource;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Resource getMaximumResourceCapability(String queueName) {
|
||||||
|
return getMaximumResourceCapability();
|
||||||
|
}
|
||||||
|
|
||||||
protected void initMaximumResourceCapability(Resource maximumAllocation) {
|
protected void initMaximumResourceCapability(Resource maximumAllocation) {
|
||||||
maxAllocWriteLock.lock();
|
maxAllocWriteLock.lock();
|
||||||
try {
|
try {
|
||||||
|
@ -635,4 +640,22 @@ public abstract class AbstractYarnScheduler
|
||||||
maxAllocWriteLock.unlock();
|
maxAllocWriteLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void refreshMaximumAllocation(Resource newMaxAlloc) {
|
||||||
|
maxAllocWriteLock.lock();
|
||||||
|
try {
|
||||||
|
configuredMaximumAllocation = Resources.clone(newMaxAlloc);
|
||||||
|
int maxMemory = newMaxAlloc.getMemory();
|
||||||
|
if (maxNodeMemory != -1) {
|
||||||
|
maxMemory = Math.min(maxMemory, maxNodeMemory);
|
||||||
|
}
|
||||||
|
int maxVcores = newMaxAlloc.getVirtualCores();
|
||||||
|
if (maxNodeVCores != -1) {
|
||||||
|
maxVcores = Math.min(maxVcores, maxNodeVCores);
|
||||||
|
}
|
||||||
|
maximumAllocation = Resources.createResource(maxMemory, maxVcores);
|
||||||
|
} finally {
|
||||||
|
maxAllocWriteLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -92,13 +92,22 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
|
||||||
public Resource getMinimumResourceCapability();
|
public Resource getMinimumResourceCapability();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get maximum allocatable {@link Resource}.
|
* Get maximum allocatable {@link Resource} at the cluster level.
|
||||||
* @return maximum allocatable resource
|
* @return maximum allocatable resource
|
||||||
*/
|
*/
|
||||||
@Public
|
@Public
|
||||||
@Stable
|
@Stable
|
||||||
public Resource getMaximumResourceCapability();
|
public Resource getMaximumResourceCapability();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get maximum allocatable {@link Resource} for the queue specified.
|
||||||
|
* @param queueName queue name
|
||||||
|
* @return maximum allocatable resource
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Stable
|
||||||
|
public Resource getMaximumResourceCapability(String queueName);
|
||||||
|
|
||||||
@LimitedPrivate("yarn")
|
@LimitedPrivate("yarn")
|
||||||
@Evolving
|
@Evolving
|
||||||
ResourceCalculator getResourceCalculator();
|
ResourceCalculator getResourceCalculator();
|
||||||
|
|
|
@ -57,7 +57,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
volatile int numContainers;
|
volatile int numContainers;
|
||||||
|
|
||||||
final Resource minimumAllocation;
|
final Resource minimumAllocation;
|
||||||
final Resource maximumAllocation;
|
Resource maximumAllocation;
|
||||||
QueueState state;
|
QueueState state;
|
||||||
final QueueMetrics metrics;
|
final QueueMetrics metrics;
|
||||||
|
|
||||||
|
@ -255,7 +255,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
Set<String> labels, String defaultLabelExpression,
|
Set<String> labels, String defaultLabelExpression,
|
||||||
Map<String, Float> nodeLabelCapacities,
|
Map<String, Float> nodeLabelCapacities,
|
||||||
Map<String, Float> maximumNodeLabelCapacities,
|
Map<String, Float> maximumNodeLabelCapacities,
|
||||||
boolean reservationContinueLooking)
|
boolean reservationContinueLooking, Resource maxAllocation)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// Sanity check
|
// Sanity check
|
||||||
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
|
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
|
||||||
|
@ -326,6 +326,8 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
this.reservationsContinueLooking = reservationContinueLooking;
|
this.reservationsContinueLooking = reservationContinueLooking;
|
||||||
|
|
||||||
this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this);
|
this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this);
|
||||||
|
|
||||||
|
this.maximumAllocation = maxAllocation;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected QueueInfo getQueueInfo() {
|
protected QueueInfo getQueueInfo() {
|
||||||
|
@ -341,7 +343,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public Resource getMaximumAllocation() {
|
public synchronized Resource getMaximumAllocation() {
|
||||||
return maximumAllocation;
|
return maximumAllocation;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
@ -356,9 +357,11 @@ public class CapacityScheduler extends
|
||||||
validateConf(this.conf);
|
validateConf(this.conf);
|
||||||
try {
|
try {
|
||||||
LOG.info("Re-initializing queues...");
|
LOG.info("Re-initializing queues...");
|
||||||
|
refreshMaximumAllocation(this.conf.getMaximumAllocation());
|
||||||
reinitializeQueues(this.conf);
|
reinitializeQueues(this.conf);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
this.conf = oldConf;
|
this.conf = oldConf;
|
||||||
|
refreshMaximumAllocation(this.conf.getMaximumAllocation());
|
||||||
throw new IOException("Failed to re-init queues", t);
|
throw new IOException("Failed to re-init queues", t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1580,6 +1583,20 @@ public class CapacityScheduler extends
|
||||||
.of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU);
|
.of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Resource getMaximumResourceCapability(String queueName) {
|
||||||
|
CSQueue queue = getQueue(queueName);
|
||||||
|
if (queue == null) {
|
||||||
|
LOG.error("Unknown queue: " + queueName);
|
||||||
|
return getMaximumResourceCapability();
|
||||||
|
}
|
||||||
|
if (!(queue instanceof LeafQueue)) {
|
||||||
|
LOG.error("queue " + queueName + " is not an leaf queue");
|
||||||
|
return getMaximumResourceCapability();
|
||||||
|
}
|
||||||
|
return ((LeafQueue)queue).getMaximumAllocation();
|
||||||
|
}
|
||||||
|
|
||||||
private String handleMoveToPlanQueue(String targetQueueName) {
|
private String handleMoveToPlanQueue(String targetQueueName) {
|
||||||
CSQueue dest = getQueue(targetQueueName);
|
CSQueue dest = getQueue(targetQueueName);
|
||||||
if (dest != null && dest instanceof PlanQueue) {
|
if (dest != null && dest instanceof PlanQueue) {
|
||||||
|
|
|
@ -108,6 +108,13 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
||||||
@Private
|
@Private
|
||||||
public static final boolean DEFAULT_RESERVE_CONT_LOOK_ALL_NODES = true;
|
public static final boolean DEFAULT_RESERVE_CONT_LOOK_ALL_NODES = true;
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public static final String MAXIMUM_ALLOCATION_MB = "maximum-allocation-mb";
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public static final String MAXIMUM_ALLOCATION_VCORES =
|
||||||
|
"maximum-allocation-vcores";
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000;
|
public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000;
|
||||||
|
|
||||||
|
@ -580,6 +587,48 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
||||||
return Resources.createResource(maximumMemory, maximumCores);
|
return Resources.createResource(maximumMemory, maximumCores);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the per queue setting for the maximum limit to allocate to
|
||||||
|
* each container request.
|
||||||
|
*
|
||||||
|
* @param queue
|
||||||
|
* name of the queue
|
||||||
|
* @return setting specified per queue else falls back to the cluster setting
|
||||||
|
*/
|
||||||
|
public Resource getMaximumAllocationPerQueue(String queue) {
|
||||||
|
String queuePrefix = getQueuePrefix(queue);
|
||||||
|
int maxAllocationMbPerQueue = getInt(queuePrefix + MAXIMUM_ALLOCATION_MB,
|
||||||
|
(int)UNDEFINED);
|
||||||
|
int maxAllocationVcoresPerQueue = getInt(
|
||||||
|
queuePrefix + MAXIMUM_ALLOCATION_VCORES, (int)UNDEFINED);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("max alloc mb per queue for " + queue + " is "
|
||||||
|
+ maxAllocationMbPerQueue);
|
||||||
|
LOG.debug("max alloc vcores per queue for " + queue + " is "
|
||||||
|
+ maxAllocationVcoresPerQueue);
|
||||||
|
}
|
||||||
|
Resource clusterMax = getMaximumAllocation();
|
||||||
|
if (maxAllocationMbPerQueue == (int)UNDEFINED) {
|
||||||
|
LOG.info("max alloc mb per queue for " + queue + " is undefined");
|
||||||
|
maxAllocationMbPerQueue = clusterMax.getMemory();
|
||||||
|
}
|
||||||
|
if (maxAllocationVcoresPerQueue == (int)UNDEFINED) {
|
||||||
|
LOG.info("max alloc vcore per queue for " + queue + " is undefined");
|
||||||
|
maxAllocationVcoresPerQueue = clusterMax.getVirtualCores();
|
||||||
|
}
|
||||||
|
Resource result = Resources.createResource(maxAllocationMbPerQueue,
|
||||||
|
maxAllocationVcoresPerQueue);
|
||||||
|
if (maxAllocationMbPerQueue > clusterMax.getMemory()
|
||||||
|
|| maxAllocationVcoresPerQueue > clusterMax.getVirtualCores()) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Queue maximum allocation cannot be larger than the cluster setting"
|
||||||
|
+ " for queue " + queue
|
||||||
|
+ " max allocation per queue: " + result
|
||||||
|
+ " cluster setting: " + clusterMax);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
public boolean getEnableUserMetrics() {
|
public boolean getEnableUserMetrics() {
|
||||||
return getBoolean(ENABLE_USER_METRICS, DEFAULT_ENABLE_USER_METRICS);
|
return getBoolean(ENABLE_USER_METRICS, DEFAULT_ENABLE_USER_METRICS);
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,6 +39,8 @@ public interface CapacitySchedulerContext {
|
||||||
|
|
||||||
Resource getMaximumResourceCapability();
|
Resource getMaximumResourceCapability();
|
||||||
|
|
||||||
|
Resource getMaximumResourceCapability(String queueName);
|
||||||
|
|
||||||
RMContainerTokenSecretManager getContainerTokenSecretManager();
|
RMContainerTokenSecretManager getContainerTokenSecretManager();
|
||||||
|
|
||||||
int getNumClusterNodes();
|
int getNumClusterNodes();
|
||||||
|
|
|
@ -97,7 +97,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
|
|
||||||
Set<FiCaSchedulerApp> pendingApplications;
|
Set<FiCaSchedulerApp> pendingApplications;
|
||||||
|
|
||||||
private final float minimumAllocationFactor;
|
private float minimumAllocationFactor;
|
||||||
|
|
||||||
private Map<String, User> users = new HashMap<String, User>();
|
private Map<String, User> users = new HashMap<String, User>();
|
||||||
|
|
||||||
|
@ -162,7 +162,8 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
state, acls, cs.getConfiguration().getNodeLocalityDelay(), accessibleLabels,
|
state, acls, cs.getConfiguration().getNodeLocalityDelay(), accessibleLabels,
|
||||||
defaultLabelExpression, this.capacitiyByNodeLabels,
|
defaultLabelExpression, this.capacitiyByNodeLabels,
|
||||||
this.maxCapacityByNodeLabels,
|
this.maxCapacityByNodeLabels,
|
||||||
cs.getConfiguration().getReservationContinueLook());
|
cs.getConfiguration().getReservationContinueLook(),
|
||||||
|
cs.getConfiguration().getMaximumAllocationPerQueue(getQueuePath()));
|
||||||
|
|
||||||
if(LOG.isDebugEnabled()) {
|
if(LOG.isDebugEnabled()) {
|
||||||
LOG.debug("LeafQueue:" + " name=" + queueName
|
LOG.debug("LeafQueue:" + " name=" + queueName
|
||||||
|
@ -192,11 +193,12 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
Set<String> labels, String defaultLabelExpression,
|
Set<String> labels, String defaultLabelExpression,
|
||||||
Map<String, Float> capacitieByLabel,
|
Map<String, Float> capacitieByLabel,
|
||||||
Map<String, Float> maximumCapacitiesByLabel,
|
Map<String, Float> maximumCapacitiesByLabel,
|
||||||
boolean revervationContinueLooking) throws IOException {
|
boolean revervationContinueLooking,
|
||||||
|
Resource maxAllocation) throws IOException {
|
||||||
super.setupQueueConfigs(clusterResource, capacity, absoluteCapacity,
|
super.setupQueueConfigs(clusterResource, capacity, absoluteCapacity,
|
||||||
maximumCapacity, absoluteMaxCapacity, state, acls, labels,
|
maximumCapacity, absoluteMaxCapacity, state, acls, labels,
|
||||||
defaultLabelExpression, capacitieByLabel, maximumCapacitiesByLabel,
|
defaultLabelExpression, capacitieByLabel, maximumCapacitiesByLabel,
|
||||||
revervationContinueLooking);
|
revervationContinueLooking, maxAllocation);
|
||||||
// Sanity check
|
// Sanity check
|
||||||
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
|
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
|
||||||
float absCapacity = getParent().getAbsoluteCapacity() * capacity;
|
float absCapacity = getParent().getAbsoluteCapacity() * capacity;
|
||||||
|
@ -238,6 +240,12 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
|
|
||||||
this.nodeLocalityDelay = nodeLocalityDelay;
|
this.nodeLocalityDelay = nodeLocalityDelay;
|
||||||
|
|
||||||
|
// re-init this since max allocation could have changed
|
||||||
|
this.minimumAllocationFactor =
|
||||||
|
Resources.ratio(resourceCalculator,
|
||||||
|
Resources.subtract(maximumAllocation, minimumAllocation),
|
||||||
|
maximumAllocation);
|
||||||
|
|
||||||
StringBuilder aclsString = new StringBuilder();
|
StringBuilder aclsString = new StringBuilder();
|
||||||
for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
|
for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
|
||||||
aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
|
aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
|
||||||
|
@ -283,6 +291,8 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
"minimumAllocationFactor = " + minimumAllocationFactor +
|
"minimumAllocationFactor = " + minimumAllocationFactor +
|
||||||
" [= (float)(maximumAllocationMemory - minimumAllocationMemory) / " +
|
" [= (float)(maximumAllocationMemory - minimumAllocationMemory) / " +
|
||||||
"maximumAllocationMemory ]" + "\n" +
|
"maximumAllocationMemory ]" + "\n" +
|
||||||
|
"maximumAllocation = " + maximumAllocation +
|
||||||
|
" [= configuredMaxAllocation ]" + "\n" +
|
||||||
"numContainers = " + numContainers +
|
"numContainers = " + numContainers +
|
||||||
" [= currentNumContainers ]" + "\n" +
|
" [= currentNumContainers ]" + "\n" +
|
||||||
"state = " + state +
|
"state = " + state +
|
||||||
|
@ -479,6 +489,21 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
LeafQueue newlyParsedLeafQueue = (LeafQueue)newlyParsedQueue;
|
LeafQueue newlyParsedLeafQueue = (LeafQueue)newlyParsedQueue;
|
||||||
|
|
||||||
|
// don't allow the maximum allocation to be decreased in size
|
||||||
|
// since we have already told running AM's the size
|
||||||
|
Resource oldMax = getMaximumAllocation();
|
||||||
|
Resource newMax = newlyParsedLeafQueue.getMaximumAllocation();
|
||||||
|
if (newMax.getMemory() < oldMax.getMemory()
|
||||||
|
|| newMax.getVirtualCores() < oldMax.getVirtualCores()) {
|
||||||
|
throw new IOException(
|
||||||
|
"Trying to reinitialize "
|
||||||
|
+ getQueuePath()
|
||||||
|
+ " the maximum allocation size can not be decreased!"
|
||||||
|
+ " Current setting: " + oldMax
|
||||||
|
+ ", trying to set it to: " + newMax);
|
||||||
|
}
|
||||||
|
|
||||||
setupQueueConfigs(
|
setupQueueConfigs(
|
||||||
clusterResource,
|
clusterResource,
|
||||||
newlyParsedLeafQueue.capacity, newlyParsedLeafQueue.absoluteCapacity,
|
newlyParsedLeafQueue.capacity, newlyParsedLeafQueue.absoluteCapacity,
|
||||||
|
@ -494,7 +519,8 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
newlyParsedLeafQueue.defaultLabelExpression,
|
newlyParsedLeafQueue.defaultLabelExpression,
|
||||||
newlyParsedLeafQueue.capacitiyByNodeLabels,
|
newlyParsedLeafQueue.capacitiyByNodeLabels,
|
||||||
newlyParsedLeafQueue.maxCapacityByNodeLabels,
|
newlyParsedLeafQueue.maxCapacityByNodeLabels,
|
||||||
newlyParsedLeafQueue.reservationsContinueLooking);
|
newlyParsedLeafQueue.reservationsContinueLooking,
|
||||||
|
newlyParsedLeafQueue.getMaximumAllocation());
|
||||||
|
|
||||||
// queue metrics are updated, more resource may be available
|
// queue metrics are updated, more resource may be available
|
||||||
// activate the pending applications if possible
|
// activate the pending applications if possible
|
||||||
|
|
|
@ -132,7 +132,7 @@ public class ParentQueue extends AbstractCSQueue {
|
||||||
super.setupQueueConfigs(clusterResource, capacity, absoluteCapacity,
|
super.setupQueueConfigs(clusterResource, capacity, absoluteCapacity,
|
||||||
maximumCapacity, absoluteMaxCapacity, state, acls, accessibleLabels,
|
maximumCapacity, absoluteMaxCapacity, state, acls, accessibleLabels,
|
||||||
defaultLabelExpression, nodeLabelCapacities, maximumCapacitiesByLabel,
|
defaultLabelExpression, nodeLabelCapacities, maximumCapacitiesByLabel,
|
||||||
reservationContinueLooking);
|
reservationContinueLooking, maximumAllocation);
|
||||||
StringBuilder aclsString = new StringBuilder();
|
StringBuilder aclsString = new StringBuilder();
|
||||||
for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
|
for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
|
||||||
aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
|
aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
|
||||||
|
|
|
@ -500,9 +500,12 @@ public class TestCapacityScheduler {
|
||||||
public void testParseQueue() throws IOException {
|
public void testParseQueue() throws IOException {
|
||||||
CapacityScheduler cs = new CapacityScheduler();
|
CapacityScheduler cs = new CapacityScheduler();
|
||||||
cs.setConf(new YarnConfiguration());
|
cs.setConf(new YarnConfiguration());
|
||||||
|
cs.setRMContext(resourceManager.getRMContext());
|
||||||
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||||
setupQueueConfiguration(conf);
|
setupQueueConfiguration(conf);
|
||||||
|
cs.init(conf);
|
||||||
|
cs.start();
|
||||||
|
|
||||||
conf.setQueues(CapacitySchedulerConfiguration.ROOT + ".a.a1", new String[] {"b1"} );
|
conf.setQueues(CapacitySchedulerConfiguration.ROOT + ".a.a1", new String[] {"b1"} );
|
||||||
conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100.0f);
|
conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100.0f);
|
||||||
conf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100.0f);
|
conf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100.0f);
|
||||||
|
@ -2124,4 +2127,301 @@ public class TestCapacityScheduler {
|
||||||
assertFalse("queue " + B2 + " should have been preemptable",
|
assertFalse("queue " + B2 + " should have been preemptable",
|
||||||
queueB2.getPreemptionDisabled());
|
queueB2.getPreemptionDisabled());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRefreshQueuesMaxAllocationRefresh() throws Exception {
|
||||||
|
// queue refresh should not allow changing the maximum allocation setting
|
||||||
|
// per queue to be smaller than previous setting
|
||||||
|
CapacityScheduler cs = new CapacityScheduler();
|
||||||
|
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||||
|
setupQueueConfiguration(conf);
|
||||||
|
cs.setConf(new YarnConfiguration());
|
||||||
|
cs.setRMContext(resourceManager.getRMContext());
|
||||||
|
cs.init(conf);
|
||||||
|
cs.start();
|
||||||
|
cs.reinitialize(conf, mockContext);
|
||||||
|
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
|
||||||
|
|
||||||
|
assertEquals("max allocation in CS",
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
||||||
|
cs.getMaximumResourceCapability().getMemory());
|
||||||
|
assertEquals("max allocation for A1",
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
||||||
|
conf.getMaximumAllocationPerQueue(A1).getMemory());
|
||||||
|
assertEquals("max allocation",
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
||||||
|
conf.getMaximumAllocation().getMemory());
|
||||||
|
|
||||||
|
CSQueue rootQueue = cs.getRootQueue();
|
||||||
|
CSQueue queueA = findQueue(rootQueue, A);
|
||||||
|
CSQueue queueA1 = findQueue(queueA, A1);
|
||||||
|
assertEquals("queue max allocation", ((LeafQueue) queueA1)
|
||||||
|
.getMaximumAllocation().getMemory(), 8192);
|
||||||
|
|
||||||
|
setMaxAllocMb(conf, A1, 4096);
|
||||||
|
|
||||||
|
try {
|
||||||
|
cs.reinitialize(conf, mockContext);
|
||||||
|
fail("should have thrown exception");
|
||||||
|
} catch (IOException e) {
|
||||||
|
assertTrue("max allocation exception",
|
||||||
|
e.getCause().toString().contains("not be decreased"));
|
||||||
|
}
|
||||||
|
|
||||||
|
setMaxAllocMb(conf, A1, 8192);
|
||||||
|
cs.reinitialize(conf, mockContext);
|
||||||
|
|
||||||
|
setMaxAllocVcores(conf, A1,
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES - 1);
|
||||||
|
try {
|
||||||
|
cs.reinitialize(conf, mockContext);
|
||||||
|
fail("should have thrown exception");
|
||||||
|
} catch (IOException e) {
|
||||||
|
assertTrue("max allocation exception",
|
||||||
|
e.getCause().toString().contains("not be decreased"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRefreshQueuesMaxAllocationPerQueueLarge() throws Exception {
|
||||||
|
// verify we can't set the allocation per queue larger then cluster setting
|
||||||
|
CapacityScheduler cs = new CapacityScheduler();
|
||||||
|
cs.setConf(new YarnConfiguration());
|
||||||
|
cs.setRMContext(resourceManager.getRMContext());
|
||||||
|
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||||
|
setupQueueConfiguration(conf);
|
||||||
|
cs.init(conf);
|
||||||
|
cs.start();
|
||||||
|
// change max allocation for B3 queue to be larger then cluster max
|
||||||
|
setMaxAllocMb(conf, B3,
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 2048);
|
||||||
|
try {
|
||||||
|
cs.reinitialize(conf, mockContext);
|
||||||
|
fail("should have thrown exception");
|
||||||
|
} catch (IOException e) {
|
||||||
|
assertTrue("maximum allocation exception",
|
||||||
|
e.getCause().getMessage().contains("maximum allocation"));
|
||||||
|
}
|
||||||
|
|
||||||
|
setMaxAllocMb(conf, B3,
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
|
||||||
|
cs.reinitialize(conf, mockContext);
|
||||||
|
|
||||||
|
setMaxAllocVcores(conf, B3,
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES + 1);
|
||||||
|
try {
|
||||||
|
cs.reinitialize(conf, mockContext);
|
||||||
|
fail("should have thrown exception");
|
||||||
|
} catch (IOException e) {
|
||||||
|
assertTrue("maximum allocation exception",
|
||||||
|
e.getCause().getMessage().contains("maximum allocation"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRefreshQueuesMaxAllocationRefreshLarger() throws Exception {
|
||||||
|
// queue refresh should allow max allocation per queue to go larger
|
||||||
|
CapacityScheduler cs = new CapacityScheduler();
|
||||||
|
cs.setConf(new YarnConfiguration());
|
||||||
|
cs.setRMContext(resourceManager.getRMContext());
|
||||||
|
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||||
|
setupQueueConfiguration(conf);
|
||||||
|
setMaxAllocMb(conf,
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
|
||||||
|
setMaxAllocVcores(conf,
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
|
||||||
|
setMaxAllocMb(conf, A1, 4096);
|
||||||
|
setMaxAllocVcores(conf, A1, 2);
|
||||||
|
cs.init(conf);
|
||||||
|
cs.start();
|
||||||
|
cs.reinitialize(conf, mockContext);
|
||||||
|
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
|
||||||
|
|
||||||
|
assertEquals("max capability MB in CS",
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
||||||
|
cs.getMaximumResourceCapability().getMemory());
|
||||||
|
assertEquals("max capability vcores in CS",
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
|
||||||
|
cs.getMaximumResourceCapability().getVirtualCores());
|
||||||
|
assertEquals("max allocation MB A1",
|
||||||
|
4096,
|
||||||
|
conf.getMaximumAllocationPerQueue(A1).getMemory());
|
||||||
|
assertEquals("max allocation vcores A1",
|
||||||
|
2,
|
||||||
|
conf.getMaximumAllocationPerQueue(A1).getVirtualCores());
|
||||||
|
assertEquals("cluster max allocation MB",
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
||||||
|
conf.getMaximumAllocation().getMemory());
|
||||||
|
assertEquals("cluster max allocation vcores",
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
|
||||||
|
conf.getMaximumAllocation().getVirtualCores());
|
||||||
|
|
||||||
|
CSQueue rootQueue = cs.getRootQueue();
|
||||||
|
CSQueue queueA = findQueue(rootQueue, A);
|
||||||
|
CSQueue queueA1 = findQueue(queueA, A1);
|
||||||
|
assertEquals("queue max allocation", ((LeafQueue) queueA1)
|
||||||
|
.getMaximumAllocation().getMemory(), 4096);
|
||||||
|
|
||||||
|
setMaxAllocMb(conf, A1, 6144);
|
||||||
|
setMaxAllocVcores(conf, A1, 3);
|
||||||
|
cs.reinitialize(conf, null);
|
||||||
|
// conf will have changed but we shouldn't be able to change max allocation
|
||||||
|
// for the actual queue
|
||||||
|
assertEquals("max allocation MB A1", 6144,
|
||||||
|
conf.getMaximumAllocationPerQueue(A1).getMemory());
|
||||||
|
assertEquals("max allocation vcores A1", 3,
|
||||||
|
conf.getMaximumAllocationPerQueue(A1).getVirtualCores());
|
||||||
|
assertEquals("max allocation MB cluster",
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
||||||
|
conf.getMaximumAllocation().getMemory());
|
||||||
|
assertEquals("max allocation vcores cluster",
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
|
||||||
|
conf.getMaximumAllocation().getVirtualCores());
|
||||||
|
assertEquals("queue max allocation MB", 6144,
|
||||||
|
((LeafQueue) queueA1).getMaximumAllocation().getMemory());
|
||||||
|
assertEquals("queue max allocation vcores", 3,
|
||||||
|
((LeafQueue) queueA1).getMaximumAllocation().getVirtualCores());
|
||||||
|
assertEquals("max capability MB cluster",
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
||||||
|
cs.getMaximumResourceCapability().getMemory());
|
||||||
|
assertEquals("cluster max capability vcores",
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
|
||||||
|
cs.getMaximumResourceCapability().getVirtualCores());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRefreshQueuesMaxAllocationCSError() throws Exception {
|
||||||
|
// Try to refresh the cluster level max allocation size to be smaller
|
||||||
|
// and it should error out
|
||||||
|
CapacityScheduler cs = new CapacityScheduler();
|
||||||
|
cs.setConf(new YarnConfiguration());
|
||||||
|
cs.setRMContext(resourceManager.getRMContext());
|
||||||
|
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||||
|
setupQueueConfiguration(conf);
|
||||||
|
setMaxAllocMb(conf, 10240);
|
||||||
|
setMaxAllocVcores(conf, 10);
|
||||||
|
setMaxAllocMb(conf, A1, 4096);
|
||||||
|
setMaxAllocVcores(conf, A1, 4);
|
||||||
|
cs.init(conf);
|
||||||
|
cs.start();
|
||||||
|
cs.reinitialize(conf, mockContext);
|
||||||
|
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
|
||||||
|
|
||||||
|
assertEquals("max allocation MB in CS", 10240,
|
||||||
|
cs.getMaximumResourceCapability().getMemory());
|
||||||
|
assertEquals("max allocation vcores in CS", 10,
|
||||||
|
cs.getMaximumResourceCapability().getVirtualCores());
|
||||||
|
|
||||||
|
setMaxAllocMb(conf, 6144);
|
||||||
|
try {
|
||||||
|
cs.reinitialize(conf, mockContext);
|
||||||
|
fail("should have thrown exception");
|
||||||
|
} catch (IOException e) {
|
||||||
|
assertTrue("max allocation exception",
|
||||||
|
e.getCause().toString().contains("not be decreased"));
|
||||||
|
}
|
||||||
|
|
||||||
|
setMaxAllocMb(conf, 10240);
|
||||||
|
cs.reinitialize(conf, mockContext);
|
||||||
|
|
||||||
|
setMaxAllocVcores(conf, 8);
|
||||||
|
try {
|
||||||
|
cs.reinitialize(conf, mockContext);
|
||||||
|
fail("should have thrown exception");
|
||||||
|
} catch (IOException e) {
|
||||||
|
assertTrue("max allocation exception",
|
||||||
|
e.getCause().toString().contains("not be decreased"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRefreshQueuesMaxAllocationCSLarger() throws Exception {
|
||||||
|
// Try to refresh the cluster level max allocation size to be larger
|
||||||
|
// and verify that if there is no setting per queue it uses the
|
||||||
|
// cluster level setting.
|
||||||
|
CapacityScheduler cs = new CapacityScheduler();
|
||||||
|
cs.setConf(new YarnConfiguration());
|
||||||
|
cs.setRMContext(resourceManager.getRMContext());
|
||||||
|
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||||
|
setupQueueConfiguration(conf);
|
||||||
|
setMaxAllocMb(conf, 10240);
|
||||||
|
setMaxAllocVcores(conf, 10);
|
||||||
|
setMaxAllocMb(conf, A1, 4096);
|
||||||
|
setMaxAllocVcores(conf, A1, 4);
|
||||||
|
cs.init(conf);
|
||||||
|
cs.start();
|
||||||
|
cs.reinitialize(conf, mockContext);
|
||||||
|
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
|
||||||
|
|
||||||
|
assertEquals("max allocation MB in CS", 10240,
|
||||||
|
cs.getMaximumResourceCapability().getMemory());
|
||||||
|
assertEquals("max allocation vcores in CS", 10,
|
||||||
|
cs.getMaximumResourceCapability().getVirtualCores());
|
||||||
|
|
||||||
|
CSQueue rootQueue = cs.getRootQueue();
|
||||||
|
CSQueue queueA = findQueue(rootQueue, A);
|
||||||
|
CSQueue queueB = findQueue(rootQueue, B);
|
||||||
|
CSQueue queueA1 = findQueue(queueA, A1);
|
||||||
|
CSQueue queueA2 = findQueue(queueA, A2);
|
||||||
|
CSQueue queueB2 = findQueue(queueB, B2);
|
||||||
|
|
||||||
|
assertEquals("queue A1 max allocation MB", 4096,
|
||||||
|
((LeafQueue) queueA1).getMaximumAllocation().getMemory());
|
||||||
|
assertEquals("queue A1 max allocation vcores", 4,
|
||||||
|
((LeafQueue) queueA1).getMaximumAllocation().getVirtualCores());
|
||||||
|
assertEquals("queue A2 max allocation MB", 10240,
|
||||||
|
((LeafQueue) queueA2).getMaximumAllocation().getMemory());
|
||||||
|
assertEquals("queue A2 max allocation vcores", 10,
|
||||||
|
((LeafQueue) queueA2).getMaximumAllocation().getVirtualCores());
|
||||||
|
assertEquals("queue B2 max allocation MB", 10240,
|
||||||
|
((LeafQueue) queueB2).getMaximumAllocation().getMemory());
|
||||||
|
assertEquals("queue B2 max allocation vcores", 10,
|
||||||
|
((LeafQueue) queueB2).getMaximumAllocation().getVirtualCores());
|
||||||
|
|
||||||
|
setMaxAllocMb(conf, 12288);
|
||||||
|
setMaxAllocVcores(conf, 12);
|
||||||
|
cs.reinitialize(conf, null);
|
||||||
|
// cluster level setting should change and any queues without
|
||||||
|
// per queue setting
|
||||||
|
assertEquals("max allocation MB in CS", 12288,
|
||||||
|
cs.getMaximumResourceCapability().getMemory());
|
||||||
|
assertEquals("max allocation vcores in CS", 12,
|
||||||
|
cs.getMaximumResourceCapability().getVirtualCores());
|
||||||
|
assertEquals("queue A1 max MB allocation", 4096,
|
||||||
|
((LeafQueue) queueA1).getMaximumAllocation().getMemory());
|
||||||
|
assertEquals("queue A1 max vcores allocation", 4,
|
||||||
|
((LeafQueue) queueA1).getMaximumAllocation().getVirtualCores());
|
||||||
|
assertEquals("queue A2 max MB allocation", 12288,
|
||||||
|
((LeafQueue) queueA2).getMaximumAllocation().getMemory());
|
||||||
|
assertEquals("queue A2 max vcores allocation", 12,
|
||||||
|
((LeafQueue) queueA2).getMaximumAllocation().getVirtualCores());
|
||||||
|
assertEquals("queue B2 max MB allocation", 12288,
|
||||||
|
((LeafQueue) queueB2).getMaximumAllocation().getMemory());
|
||||||
|
assertEquals("queue B2 max vcores allocation", 12,
|
||||||
|
((LeafQueue) queueB2).getMaximumAllocation().getVirtualCores());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setMaxAllocMb(Configuration conf, int maxAllocMb) {
|
||||||
|
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
||||||
|
maxAllocMb);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setMaxAllocMb(CapacitySchedulerConfiguration conf,
|
||||||
|
String queueName, int maxAllocMb) {
|
||||||
|
String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName)
|
||||||
|
+ CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_MB;
|
||||||
|
conf.setInt(propName, maxAllocMb);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setMaxAllocVcores(Configuration conf, int maxAllocVcores) {
|
||||||
|
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
|
||||||
|
maxAllocVcores);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setMaxAllocVcores(CapacitySchedulerConfiguration conf,
|
||||||
|
String queueName, int maxAllocVcores) {
|
||||||
|
String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName)
|
||||||
|
+ CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_VCORES;
|
||||||
|
conf.setInt(propName, maxAllocVcores);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -226,6 +226,18 @@ Hadoop MapReduce Next Generation - Capacity Scheduler
|
||||||
| | ensures that a single user can never take more than the queue's configured |
|
| | ensures that a single user can never take more than the queue's configured |
|
||||||
| | capacity irrespective of how idle th cluster is. Value is specified as |
|
| | capacity irrespective of how idle th cluster is. Value is specified as |
|
||||||
| | a float.|
|
| | a float.|
|
||||||
|
*--------------------------------------+--------------------------------------+
|
||||||
|
| <<<yarn.scheduler.capacity.<queue-path>.maximum-allocation-mb>>> | |
|
||||||
|
| | The per queue maximum limit of memory to allocate to each container |
|
||||||
|
| | request at the Resource Manager. This setting overrides the cluster |
|
||||||
|
| | configuration <<<yarn.scheduler.maximum-allocation-mb>>>. This value |
|
||||||
|
| | must be smaller than or equal to the cluster maximum. |
|
||||||
|
*--------------------------------------+--------------------------------------+
|
||||||
|
| <<<yarn.scheduler.capacity.<queue-path>.maximum-allocation-vcores>>> | |
|
||||||
|
| | The per queue maximum limit of virtual cores to allocate to each container |
|
||||||
|
| | request at the Resource Manager. This setting overrides the cluster |
|
||||||
|
| | configuration <<<yarn.scheduler.maximum-allocation-vcores>>>. This value |
|
||||||
|
| | must be smaller than or equal to the cluster maximum. |
|
||||||
*--------------------------------------+--------------------------------------+
|
*--------------------------------------+--------------------------------------+
|
||||||
|
|
||||||
* Running and Pending Application Limits
|
* Running and Pending Application Limits
|
||||||
|
|
Loading…
Reference in New Issue