YARN-7738. CapacityScheduler: Support refresh maximum allocation for multiple resource types. Contributed by Wangda Tan.

This commit is contained in:
Sunil G 2018-01-18 19:05:26 +05:30
parent 8e5472b1e6
commit 06cceba1cb
8 changed files with 193 additions and 39 deletions

View File

@ -369,6 +369,9 @@ public abstract class Resource implements Comparable<Resource> {
return; return;
} }
if (resource.equals(ResourceInformation.VCORES_URI)) { if (resource.equals(ResourceInformation.VCORES_URI)) {
if (value > Integer.MAX_VALUE) {
value = (long) Integer.MAX_VALUE;
}
this.setVirtualCores((int)value); this.setVirtualCores((int)value);
return; return;
} }

View File

@ -199,9 +199,23 @@ public class ResourceUtils {
} }
} }
@VisibleForTesting /**
static void initializeResourcesMap(Configuration conf) { * Get maximum allocation from config, *THIS WILL NOT UPDATE INTERNAL DATA*
* @param conf config
* @return maximum allocation
*/
public static Resource fetchMaximumAllocationFromConfig(Configuration conf) {
Map<String, ResourceInformation> resourceInformationMap =
getResourceInformationMapFromConfig(conf);
Resource ret = Resource.newInstance(0, 0);
for (ResourceInformation entry : resourceInformationMap.values()) {
ret.setResourceValue(entry.getName(), entry.getMaximumAllocation());
}
return ret;
}
private static Map<String, ResourceInformation> getResourceInformationMapFromConfig(
Configuration conf) {
Map<String, ResourceInformation> resourceInformationMap = new HashMap<>(); Map<String, ResourceInformation> resourceInformationMap = new HashMap<>();
String[] resourceNames = conf.getStrings(YarnConfiguration.RESOURCE_TYPES); String[] resourceNames = conf.getStrings(YarnConfiguration.RESOURCE_TYPES);
@ -247,6 +261,13 @@ public class ResourceUtils {
setAllocationForMandatoryResources(resourceInformationMap, conf); setAllocationForMandatoryResources(resourceInformationMap, conf);
return resourceInformationMap;
}
@VisibleForTesting
static void initializeResourcesMap(Configuration conf) {
Map<String, ResourceInformation> resourceInformationMap =
getResourceInformationMapFromConfig(conf);
initializeResourcesFromResourceInformationMap(resourceInformationMap); initializeResourcesFromResourceInformationMap(resourceInformationMap);
} }
@ -544,19 +565,8 @@ public class ResourceUtils {
public static Resource getResourceTypesMaximumAllocation() { public static Resource getResourceTypesMaximumAllocation() {
Resource ret = Resource.newInstance(0, 0); Resource ret = Resource.newInstance(0, 0);
for (ResourceInformation entry : resourceTypesArray) { for (ResourceInformation entry : resourceTypesArray) {
String name = entry.getName(); ret.setResourceValue(entry.getName(),
if (name.equals(ResourceInformation.MEMORY_MB.getName())) { entry.getMaximumAllocation());
ret.setMemorySize(entry.getMaximumAllocation());
} else if (name.equals(ResourceInformation.VCORES.getName())) {
Long tmp = entry.getMaximumAllocation();
if (tmp > Integer.MAX_VALUE) {
tmp = (long) Integer.MAX_VALUE;
}
ret.setVirtualCores(tmp.intValue());
continue;
} else {
ret.setResourceValue(name, entry.getMaximumAllocation());
}
} }
return ret; return ret;
} }

View File

@ -404,7 +404,8 @@ public class AdminService extends CompositeService implements
throws IOException, YarnException { throws IOException, YarnException {
// Retrieve yarn-site.xml in order to refresh scheduling monitor properties. // Retrieve yarn-site.xml in order to refresh scheduling monitor properties.
Configuration conf = getConfiguration(new Configuration(false), Configuration conf = getConfiguration(new Configuration(false),
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); YarnConfiguration.YARN_SITE_CONFIGURATION_FILE,
YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE);
// The reason we call Configuration#size() is because when getConfiguration // The reason we call Configuration#size() is because when getConfiguration
// been called, it invokes Configuration#addResouce, which invokes // been called, it invokes Configuration#addResouce, which invokes
// Configuration#reloadConfiguration which triggers the reload process in a // Configuration#reloadConfiguration which triggers the reload process in a

View File

@ -143,6 +143,7 @@ import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -439,12 +440,15 @@ public class CapacityScheduler extends
validateConf(this.conf); validateConf(this.conf);
try { try {
LOG.info("Re-initializing queues..."); LOG.info("Re-initializing queues...");
refreshMaximumAllocation(this.conf.getMaximumAllocation()); refreshMaximumAllocation(
ResourceUtils.fetchMaximumAllocationFromConfig(this.conf));
reinitializeQueues(this.conf); reinitializeQueues(this.conf);
} catch (Throwable t) { } catch (Throwable t) {
this.conf = oldConf; this.conf = oldConf;
refreshMaximumAllocation(this.conf.getMaximumAllocation()); refreshMaximumAllocation(
throw new IOException("Failed to re-init queues : "+ t.getMessage(), t); ResourceUtils.fetchMaximumAllocationFromConfig(this.conf));
throw new IOException("Failed to re-init queues : " + t.getMessage(),
t);
} }
// update lazy preemption // update lazy preemption

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.Schedulabl
import org.apache.hadoop.yarn.util.UnitsConversionUtil; import org.apache.hadoop.yarn.util.UnitsConversionUtil;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList; import java.util.ArrayList;
@ -835,16 +836,6 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
return Resources.createResource(minimumMemory, minimumCores); return Resources.createResource(minimumMemory, minimumCores);
} }
public Resource getMaximumAllocation() {
int maximumMemory = getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
int maximumCores = getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
return Resources.createResource(maximumMemory, maximumCores);
}
@Private @Private
public Priority getQueuePriority(String queue) { public Priority getQueuePriority(String queue) {
String queuePolicyPrefix = getQueuePrefix(queue); String queuePolicyPrefix = getQueuePrefix(queue);
@ -868,6 +859,8 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
* @return setting specified per queue else falls back to the cluster setting * @return setting specified per queue else falls back to the cluster setting
*/ */
public Resource getMaximumAllocationPerQueue(String queue) { public Resource getMaximumAllocationPerQueue(String queue) {
// Only support to specify memory and vcores maximum allocation per queue
// for now.
String queuePrefix = getQueuePrefix(queue); String queuePrefix = getQueuePrefix(queue);
long maxAllocationMbPerQueue = getInt(queuePrefix + MAXIMUM_ALLOCATION_MB, long maxAllocationMbPerQueue = getInt(queuePrefix + MAXIMUM_ALLOCATION_MB,
(int)UNDEFINED); (int)UNDEFINED);
@ -879,7 +872,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
LOG.debug("max alloc vcores per queue for " + queue + " is " LOG.debug("max alloc vcores per queue for " + queue + " is "
+ maxAllocationVcoresPerQueue); + maxAllocationVcoresPerQueue);
} }
Resource clusterMax = getMaximumAllocation(); Resource clusterMax = ResourceUtils.fetchMaximumAllocationFromConfig(this);
if (maxAllocationMbPerQueue == (int)UNDEFINED) { if (maxAllocationMbPerQueue == (int)UNDEFINED) {
LOG.info("max alloc mb per queue for " + queue + " is undefined"); LOG.info("max alloc mb per queue for " + queue + " is undefined");
maxAllocationMbPerQueue = clusterMax.getMemorySize(); maxAllocationMbPerQueue = clusterMax.getMemorySize();
@ -888,8 +881,11 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
LOG.info("max alloc vcore per queue for " + queue + " is undefined"); LOG.info("max alloc vcore per queue for " + queue + " is undefined");
maxAllocationVcoresPerQueue = clusterMax.getVirtualCores(); maxAllocationVcoresPerQueue = clusterMax.getVirtualCores();
} }
Resource result = Resources.createResource(maxAllocationMbPerQueue, // Copy from clusterMax and overwrite per-queue's maximum memory/vcore
maxAllocationVcoresPerQueue); // allocation.
Resource result = Resources.clone(clusterMax);
result.setMemorySize(maxAllocationMbPerQueue);
result.setVirtualCores(maxAllocationVcoresPerQueue);
if (maxAllocationMbPerQueue > clusterMax.getMemorySize() if (maxAllocationMbPerQueue > clusterMax.getMemorySize()
|| maxAllocationVcoresPerQueue > clusterMax.getVirtualCores()) { || maxAllocationVcoresPerQueue > clusterMax.getVirtualCores()) {
throw new IllegalArgumentException( throw new IllegalArgumentException(

View File

@ -553,8 +553,8 @@ public class LeafQueue extends AbstractCSQueue {
// since we have already told running AM's the size // since we have already told running AM's the size
Resource oldMax = getMaximumAllocation(); Resource oldMax = getMaximumAllocation();
Resource newMax = newlyParsedLeafQueue.getMaximumAllocation(); Resource newMax = newlyParsedLeafQueue.getMaximumAllocation();
if (newMax.getMemorySize() < oldMax.getMemorySize()
|| newMax.getVirtualCores() < oldMax.getVirtualCores()) { if (!Resources.fitsIn(oldMax, newMax)) {
throw new IOException("Trying to reinitialize " + getQueuePath() throw new IOException("Trying to reinitialize " + getQueuePath()
+ " the maximum allocation size can not be decreased!" + " the maximum allocation size can not be decreased!"
+ " Current setting: " + oldMax + ", trying to set it to: " + " Current setting: " + oldMax + ", trying to set it to: "

View File

@ -154,6 +154,7 @@ import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -2942,7 +2943,7 @@ public class TestCapacityScheduler {
conf.getMaximumAllocationPerQueue(A1).getMemorySize()); conf.getMaximumAllocationPerQueue(A1).getMemorySize());
assertEquals("max allocation", assertEquals("max allocation",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
conf.getMaximumAllocation().getMemorySize()); ResourceUtils.fetchMaximumAllocationFromConfig(conf).getMemorySize());
CSQueue rootQueue = cs.getRootQueue(); CSQueue rootQueue = cs.getRootQueue();
CSQueue queueA = findQueue(rootQueue, A); CSQueue queueA = findQueue(rootQueue, A);
@ -3043,10 +3044,10 @@ public class TestCapacityScheduler {
conf.getMaximumAllocationPerQueue(A1).getVirtualCores()); conf.getMaximumAllocationPerQueue(A1).getVirtualCores());
assertEquals("cluster max allocation MB", assertEquals("cluster max allocation MB",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
conf.getMaximumAllocation().getMemorySize()); ResourceUtils.fetchMaximumAllocationFromConfig(conf).getMemorySize());
assertEquals("cluster max allocation vcores", assertEquals("cluster max allocation vcores",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
conf.getMaximumAllocation().getVirtualCores()); ResourceUtils.fetchMaximumAllocationFromConfig(conf).getVirtualCores());
CSQueue rootQueue = cs.getRootQueue(); CSQueue rootQueue = cs.getRootQueue();
CSQueue queueA = findQueue(rootQueue, A); CSQueue queueA = findQueue(rootQueue, A);
@ -3065,10 +3066,10 @@ public class TestCapacityScheduler {
conf.getMaximumAllocationPerQueue(A1).getVirtualCores()); conf.getMaximumAllocationPerQueue(A1).getVirtualCores());
assertEquals("max allocation MB cluster", assertEquals("max allocation MB cluster",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
conf.getMaximumAllocation().getMemorySize()); ResourceUtils.fetchMaximumAllocationFromConfig(conf).getMemorySize());
assertEquals("max allocation vcores cluster", assertEquals("max allocation vcores cluster",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
conf.getMaximumAllocation().getVirtualCores()); ResourceUtils.fetchMaximumAllocationFromConfig(conf).getVirtualCores());
assertEquals("queue max allocation MB", 6144, assertEquals("queue max allocation MB", 6144,
((LeafQueue) queueA1).getMaximumAllocation().getMemorySize()); ((LeafQueue) queueA1).getMaximumAllocation().getMemorySize());
assertEquals("queue max allocation vcores", 3, assertEquals("queue max allocation vcores", 3,

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -147,4 +148,142 @@ public class TestCapacitySchedulerWithMultiResourceTypes {
TestUtils.createResource(2 * GB, 2, ImmutableMap.of(RESOURCE_1, 2)), TestUtils.createResource(2 * GB, 2, ImmutableMap.of(RESOURCE_1, 2)),
containerTokenIdentifier.getResource()); containerTokenIdentifier.getResource());
} }
@Test
public void testMaximumAllocationRefreshWithMultipleResourceTypes() throws Exception {
// Initialize resource map
Map<String, ResourceInformation> riMap = new HashMap<>();
// Initialize mandatory resources
ResourceInformation memory = ResourceInformation.newInstance(
ResourceInformation.MEMORY_MB.getName(),
ResourceInformation.MEMORY_MB.getUnits(),
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
ResourceInformation vcores = ResourceInformation.newInstance(
ResourceInformation.VCORES.getName(),
ResourceInformation.VCORES.getUnits(),
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
riMap.put(ResourceInformation.MEMORY_URI, memory);
riMap.put(ResourceInformation.VCORES_URI, vcores);
riMap.put(RESOURCE_1, ResourceInformation.newInstance(RESOURCE_1, "", 0,
ResourceTypes.COUNTABLE, 0, 3333L));
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
CapacitySchedulerConfiguration csconf =
new CapacitySchedulerConfiguration();
csconf.setMaximumApplicationMasterResourcePerQueuePercent("root", 100.0f);
csconf.setMaximumAMResourcePercentPerPartition("root", "", 100.0f);
csconf.setMaximumApplicationMasterResourcePerQueuePercent("root.default",
100.0f);
csconf.setMaximumAMResourcePercentPerPartition("root.default", "", 100.0f);
csconf.setResourceComparator(DominantResourceCalculator.class);
csconf.set(YarnConfiguration.RESOURCE_TYPES, RESOURCE_1);
csconf.setInt(YarnConfiguration.RESOURCE_TYPES + "." + RESOURCE_1
+ ".maximum-allocation", 3333);
YarnConfiguration conf = new YarnConfiguration(csconf);
// Don't reset resource types since we have already configured resource
// types
conf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES, false);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
MockRM rm = new MockRM(conf);
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
Assert.assertEquals(3333L,
cs.getMaximumResourceCapability().getResourceValue(RESOURCE_1));
Assert.assertEquals(3333L,
cs.getMaximumAllocation().getResourceValue(RESOURCE_1));
Assert.assertEquals(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
cs.getMaximumResourceCapability()
.getResourceValue(ResourceInformation.MEMORY_URI));
Assert.assertEquals(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
cs.getMaximumAllocation()
.getResourceValue(ResourceInformation.MEMORY_URI));
Assert.assertEquals(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
cs.getMaximumResourceCapability()
.getResourceValue(ResourceInformation.VCORES_URI));
Assert.assertEquals(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
cs.getMaximumAllocation()
.getResourceValue(ResourceInformation.VCORES_URI));
// Set RES_1 to 3332 (less than 3333) and refresh CS, failures expected.
csconf.set(YarnConfiguration.RESOURCE_TYPES, RESOURCE_1);
csconf.setInt(YarnConfiguration.RESOURCE_TYPES + "." + RESOURCE_1
+ ".maximum-allocation", 3332);
boolean exception = false;
try {
cs.reinitialize(csconf, rm.getRMContext());
} catch (IOException e) {
exception = true;
}
Assert.assertTrue("Should have exception in CS", exception);
// Maximum allocation won't be updated
Assert.assertEquals(3333L,
cs.getMaximumResourceCapability().getResourceValue(RESOURCE_1));
Assert.assertEquals(3333L,
cs.getMaximumAllocation().getResourceValue(RESOURCE_1));
Assert.assertEquals(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
cs.getMaximumResourceCapability()
.getResourceValue(ResourceInformation.MEMORY_URI));
Assert.assertEquals(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
cs.getMaximumAllocation()
.getResourceValue(ResourceInformation.MEMORY_URI));
Assert.assertEquals(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
cs.getMaximumResourceCapability()
.getResourceValue(ResourceInformation.VCORES_URI));
Assert.assertEquals(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
cs.getMaximumAllocation()
.getResourceValue(ResourceInformation.VCORES_URI));
// Set RES_1 to 3334 and refresh CS, should success
csconf.set(YarnConfiguration.RESOURCE_TYPES, RESOURCE_1);
csconf.setInt(YarnConfiguration.RESOURCE_TYPES + "." + RESOURCE_1
+ ".maximum-allocation", 3334);
cs.reinitialize(csconf, rm.getRMContext());
// Maximum allocation will be updated
Assert.assertEquals(3334,
cs.getMaximumResourceCapability().getResourceValue(RESOURCE_1));
// Since we haven't updated the real configuration of ResourceUtils,
// cs.getMaximumAllocation won't be updated.
Assert.assertEquals(3333,
cs.getMaximumAllocation().getResourceValue(RESOURCE_1));
Assert.assertEquals(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
cs.getMaximumResourceCapability()
.getResourceValue(ResourceInformation.MEMORY_URI));
Assert.assertEquals(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
cs.getMaximumAllocation()
.getResourceValue(ResourceInformation.MEMORY_URI));
Assert.assertEquals(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
cs.getMaximumResourceCapability()
.getResourceValue(ResourceInformation.VCORES_URI));
Assert.assertEquals(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
cs.getMaximumAllocation()
.getResourceValue(ResourceInformation.VCORES_URI));
rm.close();
}
} }