diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 8d7c9435fe7..a221e84242e 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -117,6 +117,9 @@ Release 2.4.0 - UNRELEASED YARN-1332. In TestAMRMClient, replace assertTrue with assertEquals where possible (Sebastian Wong via Sandy Ryza) + YARN-1403. Separate out configuration loading from QueueManager in the Fair + Scheduler (Sandy Ryza) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java new file mode 100644 index 00000000000..d12658b63aa --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -0,0 +1,229 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; +import org.apache.hadoop.yarn.util.resource.Resources; + +import com.google.common.annotations.VisibleForTesting; + +public class AllocationConfiguration { + private static final AccessControlList EVERYBODY_ACL = new AccessControlList("*"); + private static final AccessControlList NOBODY_ACL = new AccessControlList(" "); + + // Minimum resource allocation for each queue + private final Map minQueueResources; + // Maximum amount of resources per queue + private final Map maxQueueResources; + // Sharing weights for each queue + private final Map queueWeights; + + // Max concurrent running applications for each queue and for each user; in addition, + // for users that have no max specified, we use the userMaxJobsDefault. + @VisibleForTesting + final Map queueMaxApps; + @VisibleForTesting + final Map userMaxApps; + private final int userMaxAppsDefault; + private final int queueMaxAppsDefault; + + // ACL's for each queue. Only specifies non-default ACL's from configuration. + private final Map> queueAcls; + + // Min share preemption timeout for each queue in seconds. If a job in the queue + // waits this long without receiving its guaranteed share, it is allowed to + // preempt other jobs' tasks. + private final Map minSharePreemptionTimeouts; + + // Default min share preemption timeout for queues where it is not set + // explicitly. + private final long defaultMinSharePreemptionTimeout; + + // Preemption timeout for jobs below fair share in seconds. If a job remains + // below half its fair share for this long, it is allowed to preempt tasks. + private final long fairSharePreemptionTimeout; + + private final Map schedulingPolicies; + + private final SchedulingPolicy defaultSchedulingPolicy; + + // Policy for mapping apps to queues + @VisibleForTesting + QueuePlacementPolicy placementPolicy; + + private final Set queueNames; + + public AllocationConfiguration(Map minQueueResources, + Map maxQueueResources, + Map queueMaxApps, Map userMaxApps, + Map queueWeights, int userMaxAppsDefault, + int queueMaxAppsDefault, Map schedulingPolicies, + SchedulingPolicy defaultSchedulingPolicy, + Map minSharePreemptionTimeouts, + Map> queueAcls, + long fairSharePreemptionTimeout, long defaultMinSharePreemptionTimeout, + QueuePlacementPolicy placementPolicy, Set queueNames) { + this.minQueueResources = minQueueResources; + this.maxQueueResources = maxQueueResources; + this.queueMaxApps = queueMaxApps; + this.userMaxApps = userMaxApps; + this.queueWeights = queueWeights; + this.userMaxAppsDefault = userMaxAppsDefault; + this.queueMaxAppsDefault = queueMaxAppsDefault; + this.defaultSchedulingPolicy = defaultSchedulingPolicy; + this.schedulingPolicies = schedulingPolicies; + this.minSharePreemptionTimeouts = minSharePreemptionTimeouts; + this.queueAcls = queueAcls; + this.fairSharePreemptionTimeout = fairSharePreemptionTimeout; + this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout; + this.placementPolicy = placementPolicy; + this.queueNames = queueNames; + } + + public AllocationConfiguration(Configuration conf) { + minQueueResources = new HashMap(); + maxQueueResources = new HashMap(); + queueWeights = new HashMap(); + queueMaxApps = new HashMap(); + userMaxApps = new HashMap(); + userMaxAppsDefault = Integer.MAX_VALUE; + queueMaxAppsDefault = Integer.MAX_VALUE; + queueAcls = new HashMap>(); + minSharePreemptionTimeouts = new HashMap(); + defaultMinSharePreemptionTimeout = Long.MAX_VALUE; + fairSharePreemptionTimeout = Long.MAX_VALUE; + schedulingPolicies = new HashMap(); + defaultSchedulingPolicy = SchedulingPolicy.DEFAULT_POLICY; + placementPolicy = QueuePlacementPolicy.fromConfiguration(conf, + new HashSet()); + queueNames = new HashSet(); + } + + /** + * Get the ACLs associated with this queue. If a given ACL is not explicitly + * configured, include the default value for that ACL. The default for the + * root queue is everybody ("*") and the default for all other queues is + * nobody ("") + */ + public AccessControlList getQueueAcl(String queue, QueueACL operation) { + Map queueAcls = this.queueAcls.get(queue); + if (queueAcls != null) { + AccessControlList operationAcl = queueAcls.get(operation); + if (operationAcl != null) { + return operationAcl; + } + } + return (queue.equals("root")) ? EVERYBODY_ACL : NOBODY_ACL; + } + + /** + * Get a queue's min share preemption timeout, in milliseconds. This is the + * time after which jobs in the queue may kill other queues' tasks if they + * are below their min share. + */ + public long getMinSharePreemptionTimeout(String queueName) { + Long minSharePreemptionTimeout = minSharePreemptionTimeouts.get(queueName); + return (minSharePreemptionTimeout == null) ? defaultMinSharePreemptionTimeout + : minSharePreemptionTimeout; + } + + /** + * Get the fair share preemption, in milliseconds. This is the time + * after which any job may kill other jobs' tasks if it is below half + * its fair share. + */ + public long getFairSharePreemptionTimeout() { + return fairSharePreemptionTimeout; + } + + public ResourceWeights getQueueWeight(String queue) { + ResourceWeights weight = queueWeights.get(queue); + return (weight == null) ? ResourceWeights.NEUTRAL : weight; + } + + public int getUserMaxApps(String user) { + Integer maxApps = userMaxApps.get(user); + return (maxApps == null) ? userMaxAppsDefault : maxApps; + } + + public int getQueueMaxApps(String queue) { + Integer maxApps = queueMaxApps.get(queue); + return (maxApps == null) ? queueMaxAppsDefault : maxApps; + } + + /** + * Get the minimum resource allocation for the given queue. + * @return the cap set on this queue, or 0 if not set. + */ + public Resource getMinResources(String queue) { + Resource minQueueResource = minQueueResources.get(queue); + return (minQueueResource == null) ? Resources.none() : minQueueResource; + } + + /** + * Get the maximum resource allocation for the given queue. + * @return the cap set on this queue, or Integer.MAX_VALUE if not set. + */ + + public Resource getMaxResources(String queueName) { + Resource maxQueueResource = maxQueueResources.get(queueName); + return (maxQueueResource == null) ? Resources.unbounded() : maxQueueResource; + } + + public boolean hasAccess(String queueName, QueueACL acl, + UserGroupInformation user) { + int lastPeriodIndex = queueName.length(); + while (lastPeriodIndex != -1) { + String queue = queueName.substring(0, lastPeriodIndex); + if (getQueueAcl(queue, acl).isUserAllowed(user)) { + return true; + } + + lastPeriodIndex = queueName.lastIndexOf('.', lastPeriodIndex - 1); + } + + return false; + } + + public SchedulingPolicy getSchedulingPolicy(String queueName) { + SchedulingPolicy policy = schedulingPolicies.get(queueName); + return (policy == null) ? defaultSchedulingPolicy : policy; + } + + public SchedulingPolicy getDefaultSchedulingPolicy() { + return defaultSchedulingPolicy; + } + + public Set getQueueNames() { + return queueNames; + } + + public QueuePlacementPolicy getPlacementPolicy() { + return placementPolicy; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java new file mode 100644 index 00000000000..2a7164d49b8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -0,0 +1,398 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; +import org.w3c.dom.Text; +import org.xml.sax.SAXException; + +import com.google.common.annotations.VisibleForTesting; + +@Public +@Unstable +public class AllocationFileLoaderService extends AbstractService { + + public static final Log LOG = LogFactory.getLog( + AllocationFileLoaderService.class.getName()); + + /** Time to wait between checks of the allocation file */ + public static final long ALLOC_RELOAD_INTERVAL_MS = 10 * 1000; + + /** + * Time to wait after the allocation has been modified before reloading it + * (this is done to prevent loading a file that hasn't been fully written). + */ + public static final long ALLOC_RELOAD_WAIT_MS = 5 * 1000; + + private final Clock clock; + + private long lastSuccessfulReload; // Last time we successfully reloaded queues + private boolean lastReloadAttemptFailed = false; + + // Path to XML file containing allocations. + private File allocFile; + + private Listener reloadListener; + + @VisibleForTesting + long reloadIntervalMs = ALLOC_RELOAD_INTERVAL_MS; + + private Thread reloadThread; + private volatile boolean running = true; + + public AllocationFileLoaderService() { + this(new SystemClock()); + } + + public AllocationFileLoaderService(Clock clock) { + super(AllocationFileLoaderService.class.getName()); + this.clock = clock; + + } + + @Override + public void init(Configuration conf) { + this.allocFile = getAllocationFile(conf); + super.init(conf); + } + + @Override + public void start() { + if (allocFile == null) { + return; + } + reloadThread = new Thread() { + public void run() { + while (running) { + long time = clock.getTime(); + long lastModified = allocFile.lastModified(); + if (lastModified > lastSuccessfulReload && + time > lastModified + ALLOC_RELOAD_WAIT_MS) { + try { + reloadAllocations(); + } catch (Exception ex) { + if (!lastReloadAttemptFailed) { + LOG.error("Failed to reload fair scheduler config file - " + + "will use existing allocations.", ex); + } + lastReloadAttemptFailed = true; + } + } else if (lastModified == 0l) { + if (!lastReloadAttemptFailed) { + LOG.warn("Failed to reload fair scheduler config file because" + + " last modified returned 0. File exists: " + allocFile.exists()); + } + lastReloadAttemptFailed = true; + } + try { + Thread.sleep(reloadIntervalMs); + } catch (InterruptedException ex) { + LOG.info("Interrupted while waiting to reload alloc configuration"); + } + } + } + }; + reloadThread.setName("AllocationFileReloader"); + reloadThread.setDaemon(true); + reloadThread.start(); + super.start(); + } + + @Override + public void stop() { + running = false; + reloadThread.interrupt(); + super.stop(); + } + + /** + * Path to XML file containing allocations. If the + * path is relative, it is searched for in the + * classpath, but loaded like a regular File. + */ + public File getAllocationFile(Configuration conf) { + String allocFilePath = conf.get(FairSchedulerConfiguration.ALLOCATION_FILE, + FairSchedulerConfiguration.DEFAULT_ALLOCATION_FILE); + File allocFile = new File(allocFilePath); + if (!allocFile.isAbsolute()) { + URL url = Thread.currentThread().getContextClassLoader() + .getResource(allocFilePath); + if (url == null) { + LOG.warn(allocFilePath + " not found on the classpath."); + allocFile = null; + } else if (!url.getProtocol().equalsIgnoreCase("file")) { + throw new RuntimeException("Allocation file " + url + + " found on the classpath is not on the local filesystem."); + } else { + allocFile = new File(url.getPath()); + } + } + return allocFile; + } + + public synchronized void setReloadListener(Listener reloadListener) { + this.reloadListener = reloadListener; + } + + /** + * Updates the allocation list from the allocation config file. This file is + * expected to be in the XML format specified in the design doc. + * + * @throws IOException if the config file cannot be read. + * @throws AllocationConfigurationException if allocations are invalid. + * @throws ParserConfigurationException if XML parser is misconfigured. + * @throws SAXException if config file is malformed. + */ + public synchronized void reloadAllocations() throws IOException, + ParserConfigurationException, SAXException, AllocationConfigurationException { + if (allocFile == null) { + return; + } + LOG.info("Loading allocation file " + allocFile); + // Create some temporary hashmaps to hold the new allocs, and we only save + // them in our fields if we have parsed the entire allocs file successfully. + Map minQueueResources = new HashMap(); + Map maxQueueResources = new HashMap(); + Map queueMaxApps = new HashMap(); + Map userMaxApps = new HashMap(); + Map queueWeights = new HashMap(); + Map queuePolicies = new HashMap(); + Map minSharePreemptionTimeouts = new HashMap(); + Map> queueAcls = + new HashMap>(); + int userMaxAppsDefault = Integer.MAX_VALUE; + int queueMaxAppsDefault = Integer.MAX_VALUE; + long fairSharePreemptionTimeout = Long.MAX_VALUE; + long defaultMinSharePreemptionTimeout = Long.MAX_VALUE; + SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY; + + QueuePlacementPolicy newPlacementPolicy = null; + + // Remember all queue names so we can display them on web UI, etc. + Set queueNamesInAllocFile = new HashSet(); + + // Read and parse the allocations file. + DocumentBuilderFactory docBuilderFactory = + DocumentBuilderFactory.newInstance(); + docBuilderFactory.setIgnoringComments(true); + DocumentBuilder builder = docBuilderFactory.newDocumentBuilder(); + Document doc = builder.parse(allocFile); + Element root = doc.getDocumentElement(); + if (!"allocations".equals(root.getTagName())) + throw new AllocationConfigurationException("Bad fair scheduler config " + + "file: top-level element not "); + NodeList elements = root.getChildNodes(); + List queueElements = new ArrayList(); + Element placementPolicyElement = null; + for (int i = 0; i < elements.getLength(); i++) { + Node node = elements.item(i); + if (node instanceof Element) { + Element element = (Element)node; + if ("queue".equals(element.getTagName()) || + "pool".equals(element.getTagName())) { + queueElements.add(element); + } else if ("user".equals(element.getTagName())) { + String userName = element.getAttribute("name"); + NodeList fields = element.getChildNodes(); + for (int j = 0; j < fields.getLength(); j++) { + Node fieldNode = fields.item(j); + if (!(fieldNode instanceof Element)) + continue; + Element field = (Element) fieldNode; + if ("maxRunningApps".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + int val = Integer.parseInt(text); + userMaxApps.put(userName, val); + } + } + } else if ("userMaxAppsDefault".equals(element.getTagName())) { + String text = ((Text)element.getFirstChild()).getData().trim(); + int val = Integer.parseInt(text); + userMaxAppsDefault = val; + } else if ("fairSharePreemptionTimeout".equals(element.getTagName())) { + String text = ((Text)element.getFirstChild()).getData().trim(); + long val = Long.parseLong(text) * 1000L; + fairSharePreemptionTimeout = val; + } else if ("defaultMinSharePreemptionTimeout".equals(element.getTagName())) { + String text = ((Text)element.getFirstChild()).getData().trim(); + long val = Long.parseLong(text) * 1000L; + defaultMinSharePreemptionTimeout = val; + } else if ("queueMaxAppsDefault".equals(element.getTagName())) { + String text = ((Text)element.getFirstChild()).getData().trim(); + int val = Integer.parseInt(text); + queueMaxAppsDefault = val; + } else if ("defaultQueueSchedulingPolicy".equals(element.getTagName()) + || "defaultQueueSchedulingMode".equals(element.getTagName())) { + String text = ((Text)element.getFirstChild()).getData().trim(); + defaultSchedPolicy = SchedulingPolicy.parse(text); + } else if ("queuePlacementPolicy".equals(element.getTagName())) { + placementPolicyElement = element; + } else { + LOG.warn("Bad element in allocations file: " + element.getTagName()); + } + } + } + + // Load queue elements. A root queue can either be included or omitted. If + // it's included, all other queues must be inside it. + for (Element element : queueElements) { + String parent = "root"; + if (element.getAttribute("name").equalsIgnoreCase("root")) { + if (queueElements.size() > 1) { + throw new AllocationConfigurationException("If configuring root queue," + + " no other queues can be placed alongside it."); + } + parent = null; + } + loadQueue(parent, element, minQueueResources, maxQueueResources, queueMaxApps, + userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts, + queueAcls, queueNamesInAllocFile); + } + + // Load placement policy and pass it configured queues + Configuration conf = getConfig(); + if (placementPolicyElement != null) { + newPlacementPolicy = QueuePlacementPolicy.fromXml(placementPolicyElement, + queueNamesInAllocFile, conf); + } else { + newPlacementPolicy = QueuePlacementPolicy.fromConfiguration(conf, + queueNamesInAllocFile); + } + + AllocationConfiguration info = new AllocationConfiguration(minQueueResources, maxQueueResources, + queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault, + queueMaxAppsDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts, + queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout, + newPlacementPolicy, queueNamesInAllocFile); + + lastSuccessfulReload = clock.getTime(); + lastReloadAttemptFailed = false; + + reloadListener.onReload(info); + } + + /** + * Loads a queue from a queue element in the configuration file + */ + private void loadQueue(String parentName, Element element, Map minQueueResources, + Map maxQueueResources, Map queueMaxApps, + Map userMaxApps, Map queueWeights, + Map queuePolicies, + Map minSharePreemptionTimeouts, + Map> queueAcls, Set queueNamesInAllocFile) + throws AllocationConfigurationException { + String queueName = element.getAttribute("name"); + if (parentName != null) { + queueName = parentName + "." + queueName; + } + Map acls = + new HashMap(); + NodeList fields = element.getChildNodes(); + boolean isLeaf = true; + + for (int j = 0; j < fields.getLength(); j++) { + Node fieldNode = fields.item(j); + if (!(fieldNode instanceof Element)) + continue; + Element field = (Element) fieldNode; + if ("minResources".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + Resource val = FairSchedulerConfiguration.parseResourceConfigValue(text); + minQueueResources.put(queueName, val); + } else if ("maxResources".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + Resource val = FairSchedulerConfiguration.parseResourceConfigValue(text); + maxQueueResources.put(queueName, val); + } else if ("maxRunningApps".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + int val = Integer.parseInt(text); + queueMaxApps.put(queueName, val); + } else if ("weight".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + double val = Double.parseDouble(text); + queueWeights.put(queueName, new ResourceWeights((float)val)); + } else if ("minSharePreemptionTimeout".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + long val = Long.parseLong(text) * 1000L; + minSharePreemptionTimeouts.put(queueName, val); + } else if ("schedulingPolicy".equals(field.getTagName()) + || "schedulingMode".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + SchedulingPolicy policy = SchedulingPolicy.parse(text); + queuePolicies.put(queueName, policy); + } else if ("aclSubmitApps".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData(); + acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text)); + } else if ("aclAdministerApps".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData(); + acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text)); + } else if ("queue".endsWith(field.getTagName()) || + "pool".equals(field.getTagName())) { + loadQueue(queueName, field, minQueueResources, maxQueueResources, + queueMaxApps, userMaxApps, queueWeights, queuePolicies, + minSharePreemptionTimeouts, + queueAcls, queueNamesInAllocFile); + isLeaf = false; + } + } + if (isLeaf) { + queueNamesInAllocFile.add(queueName); + } + queueAcls.put(queueName, acls); + if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName) + && !Resources.fitsIn(minQueueResources.get(queueName), + maxQueueResources.get(queueName))) { + LOG.warn(String.format("Queue %s has max resources %d less than min resources %d", + queueName, maxQueueResources.get(queueName), minQueueResources.get(queueName))); + } + } + + public interface Listener { + public void onReload(AllocationConfiguration info); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index a6fbedbc52d..1257cba1fc2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -46,19 +46,15 @@ public class FSLeafQueue extends FSQueue { private final List nonRunnableAppScheds = new ArrayList(); - private final FairScheduler scheduler; - private final QueueManager queueMgr; private Resource demand = Resources.createResource(0); // Variables used for preemption private long lastTimeAtMinShare; private long lastTimeAtHalfFairShare; - public FSLeafQueue(String name, QueueManager queueMgr, FairScheduler scheduler, + public FSLeafQueue(String name, FairScheduler scheduler, FSParentQueue parent) { - super(name, queueMgr, scheduler, parent); - this.scheduler = scheduler; - this.queueMgr = queueMgr; + super(name, scheduler, parent); this.lastTimeAtMinShare = scheduler.getClock().getTime(); this.lastTimeAtHalfFairShare = scheduler.getClock().getTime(); } @@ -145,7 +141,8 @@ public class FSLeafQueue extends FSQueue { public void updateDemand() { // Compute demand by iterating through apps in the queue // Limit demand to maxResources - Resource maxRes = queueMgr.getMaxResources(getName()); + Resource maxRes = scheduler.getAllocationConfiguration() + .getMaxResources(getName()); demand = Resources.createResource(0); for (AppSchedulable sched : runnableAppScheds) { if (Resources.equals(demand, maxRes)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index 45d2811919f..0a9272594c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -41,14 +41,12 @@ public class FSParentQueue extends FSQueue { private final List childQueues = new ArrayList(); - private final QueueManager queueMgr; private Resource demand = Resources.createResource(0); private int runnableApps; - public FSParentQueue(String name, QueueManager queueMgr, FairScheduler scheduler, + public FSParentQueue(String name, FairScheduler scheduler, FSParentQueue parent) { - super(name, queueMgr, scheduler, parent); - this.queueMgr = queueMgr; + super(name, scheduler, parent); } public void addChildQueue(FSQueue child) { @@ -82,7 +80,8 @@ public class FSParentQueue extends FSQueue { public void updateDemand() { // Compute demand by iterating through apps in the queue // Limit demand to maxResources - Resource maxRes = queueMgr.getMaxResources(getName()); + Resource maxRes = scheduler.getAllocationConfiguration() + .getMaxResources(getName()); demand = Resources.createResource(0); for (FSQueue childQueue : childQueues) { childQueue.updateDemand(); @@ -164,8 +163,8 @@ public class FSParentQueue extends FSQueue { public void setPolicy(SchedulingPolicy policy) throws AllocationConfigurationException { boolean allowed = - SchedulingPolicy.isApplicableTo(policy, (this == queueMgr - .getRootQueue()) ? SchedulingPolicy.DEPTH_ROOT + SchedulingPolicy.isApplicableTo(policy, (parent == null) + ? SchedulingPolicy.DEPTH_ROOT : SchedulingPolicy.DEPTH_INTERMEDIATE); if (!allowed) { throwPolicyDoesnotApplyException(policy); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index 94c8f70a9aa..8b3d90d7666 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -39,20 +39,17 @@ import org.apache.hadoop.yarn.util.resource.Resources; @Unstable public abstract class FSQueue extends Schedulable implements Queue { private final String name; - private final QueueManager queueMgr; - private final FairScheduler scheduler; + protected final FairScheduler scheduler; private final FSQueueMetrics metrics; protected final FSParentQueue parent; protected final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - protected SchedulingPolicy policy = SchedulingPolicy.getDefault(); + protected SchedulingPolicy policy = SchedulingPolicy.DEFAULT_POLICY; - public FSQueue(String name, QueueManager queueMgr, - FairScheduler scheduler, FSParentQueue parent) { + public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) { this.name = name; - this.queueMgr = queueMgr; this.scheduler = scheduler; this.metrics = FSQueueMetrics.forQueue(getName(), parent, true, scheduler.getConf()); metrics.setMinShare(getMinShare()); @@ -88,17 +85,17 @@ public abstract class FSQueue extends Schedulable implements Queue { @Override public ResourceWeights getWeights() { - return queueMgr.getQueueWeight(getName()); + return scheduler.getAllocationConfiguration().getQueueWeight(getName()); } @Override public Resource getMinShare() { - return queueMgr.getMinResources(getName()); + return scheduler.getAllocationConfiguration().getMinResources(getName()); } @Override public Resource getMaxShare() { - return queueMgr.getMaxResources(getName()); + return scheduler.getAllocationConfiguration().getMaxResources(getName()); } @Override @@ -148,13 +145,7 @@ public abstract class FSQueue extends Schedulable implements Queue { } public boolean hasAccess(QueueACL acl, UserGroupInformation user) { - // Check if the leaf-queue allows access - if (queueMgr.getQueueAcl(getName(), acl).isUserAllowed(user)) { - return true; - } - - // Check if parent-queue allows access - return parent != null && parent.hasAccess(acl, user); + return scheduler.getAllocationConfiguration().hasAccess(name, acl, user); } /** @@ -181,7 +172,7 @@ public abstract class FSQueue extends Schedulable implements Queue { */ protected boolean assignContainerPreCheck(FSSchedulerNode node) { if (!Resources.fitsIn(getResourceUsage(), - queueMgr.getMaxResources(getName())) + scheduler.getAllocationConfiguration().getMaxResources(getName())) || node.getReservedContainer() != null) { return false; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index a882113c004..a439adc68b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -192,11 +192,16 @@ public class FairScheduler implements ResourceScheduler { @VisibleForTesting final MaxRunningAppsEnforcer maxRunningEnforcer; + + private AllocationFileLoaderService allocsLoader; + @VisibleForTesting + AllocationConfiguration allocConf; public FairScheduler() { clock = new SystemClock(); + allocsLoader = new AllocationFileLoaderService(); queueMgr = new QueueManager(this); - maxRunningEnforcer = new MaxRunningAppsEnforcer(queueMgr); + maxRunningEnforcer = new MaxRunningAppsEnforcer(this); } private void validateConf(Configuration conf) { @@ -275,7 +280,6 @@ public class FairScheduler implements ResourceScheduler { * required resources per job. */ protected synchronized void update() { - queueMgr.reloadAllocsIfNecessary(); // Relaod alloc file updatePreemptionVariables(); // Determine if any queues merit preemption FSQueue rootQueue = queueMgr.getRootQueue(); @@ -480,8 +484,8 @@ public class FairScheduler implements ResourceScheduler { */ protected Resource resToPreempt(FSLeafQueue sched, long curTime) { String queue = sched.getName(); - long minShareTimeout = queueMgr.getMinSharePreemptionTimeout(queue); - long fairShareTimeout = queueMgr.getFairSharePreemptionTimeout(); + long minShareTimeout = allocConf.getMinSharePreemptionTimeout(queue); + long fairShareTimeout = allocConf.getFairSharePreemptionTimeout(); Resource resDueToMinShare = Resources.none(); Resource resDueToFairShare = Resources.none(); if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) { @@ -650,8 +654,8 @@ public class FairScheduler implements ResourceScheduler { FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) { FSLeafQueue queue = null; try { - QueuePlacementPolicy policy = queueMgr.getPlacementPolicy(); - queueName = policy.assignAppToQueue(queueName, user); + QueuePlacementPolicy placementPolicy = allocConf.getPlacementPolicy(); + queueName = placementPolicy.assignAppToQueue(queueName, user); if (queueName == null) { return null; } @@ -1128,27 +1132,27 @@ public class FairScheduler implements ResourceScheduler { @Override public synchronized void reinitialize(Configuration conf, RMContext rmContext) throws IOException { - this.conf = new FairSchedulerConfiguration(conf); - validateConf(this.conf); - minimumAllocation = this.conf.getMinimumAllocation(); - maximumAllocation = this.conf.getMaximumAllocation(); - incrAllocation = this.conf.getIncrementAllocation(); - continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled(); - continuousSchedulingSleepMs = - this.conf.getContinuousSchedulingSleepMs(); - nodeLocalityThreshold = this.conf.getLocalityThresholdNode(); - rackLocalityThreshold = this.conf.getLocalityThresholdRack(); - nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs(); - rackLocalityDelayMs = this.conf.getLocalityDelayRackMs(); - preemptionEnabled = this.conf.getPreemptionEnabled(); - assignMultiple = this.conf.getAssignMultiple(); - maxAssign = this.conf.getMaxAssign(); - sizeBasedWeight = this.conf.getSizeBasedWeight(); - preemptionInterval = this.conf.getPreemptionInterval(); - waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill(); - usePortForNodeName = this.conf.getUsePortForNodeName(); - if (!initialized) { + this.conf = new FairSchedulerConfiguration(conf); + validateConf(this.conf); + minimumAllocation = this.conf.getMinimumAllocation(); + maximumAllocation = this.conf.getMaximumAllocation(); + incrAllocation = this.conf.getIncrementAllocation(); + continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled(); + continuousSchedulingSleepMs = + this.conf.getContinuousSchedulingSleepMs(); + nodeLocalityThreshold = this.conf.getLocalityThresholdNode(); + rackLocalityThreshold = this.conf.getLocalityThresholdRack(); + nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs(); + rackLocalityDelayMs = this.conf.getLocalityDelayRackMs(); + preemptionEnabled = this.conf.getPreemptionEnabled(); + assignMultiple = this.conf.getAssignMultiple(); + maxAssign = this.conf.getMaxAssign(); + sizeBasedWeight = this.conf.getSizeBasedWeight(); + preemptionInterval = this.conf.getPreemptionInterval(); + waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill(); + usePortForNodeName = this.conf.getUsePortForNodeName(); + rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf); this.rmContext = rmContext; this.eventLog = new FairSchedulerEventLog(); @@ -1156,8 +1160,9 @@ public class FairScheduler implements ResourceScheduler { initialized = true; + allocConf = new AllocationConfiguration(conf); try { - queueMgr.initialize(); + queueMgr.initialize(conf); } catch (Exception e) { throw new IOException("Failed to start FairScheduler", e); } @@ -1181,12 +1186,24 @@ public class FairScheduler implements ResourceScheduler { schedulingThread.setDaemon(true); schedulingThread.start(); } - } else { + + allocsLoader.init(conf); + allocsLoader.setReloadListener(new AllocationReloadListener()); + // If we fail to load allocations file on initialize, we want to fail + // immediately. After a successful load, exceptions on future reloads + // will just result in leaving things as they are. try { - queueMgr.reloadAllocs(); + allocsLoader.reloadAllocations(); } catch (Exception e) { throw new IOException("Failed to initialize FairScheduler", e); } + allocsLoader.start(); + } else { + try { + allocsLoader.reloadAllocations(); + } catch (Exception e) { + LOG.error("Failed to reload allocations file", e); + } } } @@ -1230,5 +1247,24 @@ public class FairScheduler implements ResourceScheduler { } return queue.hasAccess(acl, callerUGI); } + + public AllocationConfiguration getAllocationConfiguration() { + return allocConf; + } + + private class AllocationReloadListener implements + AllocationFileLoaderService.Listener { + + @Override + public void onReload(AllocationConfiguration queueInfo) { + // Commit the reload; also create any queue defined in the alloc file + // if it does not already exist, so it can be displayed on the web UI. + synchronized (FairScheduler.this) { + allocConf = queueInfo; + allocConf.getDefaultSchedulingPolicy().initialize(clusterCapacity); + queueMgr.updateAllocationConfiguration(allocConf); + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java index 955b102fee4..ec45cca158e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java @@ -18,7 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import java.io.File; -import java.net.URL; +import java.util.ArrayList; +import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -151,14 +152,6 @@ public class FairSchedulerConfiguration extends Configuration { return Resources.createResource(incrementMemory, incrementCores); } - public boolean getAllowUndeclaredPools() { - return getBoolean(ALLOW_UNDECLARED_POOLS, DEFAULT_ALLOW_UNDECLARED_POOLS); - } - - public boolean getUserAsDefaultQueue() { - return getBoolean(USER_AS_DEFAULT_QUEUE, DEFAULT_USER_AS_DEFAULT_QUEUE); - } - public float getLocalityThresholdNode() { return getFloat(LOCALITY_THRESHOLD_NODE, DEFAULT_LOCALITY_THRESHOLD_NODE); } @@ -199,30 +192,6 @@ public class FairSchedulerConfiguration extends Configuration { return getBoolean(SIZE_BASED_WEIGHT, DEFAULT_SIZE_BASED_WEIGHT); } - /** - * Path to XML file containing allocations. If the - * path is relative, it is searched for in the - * classpath, but loaded like a regular File. - */ - public File getAllocationFile() { - String allocFilePath = get(ALLOCATION_FILE, DEFAULT_ALLOCATION_FILE); - File allocFile = new File(allocFilePath); - if (!allocFile.isAbsolute()) { - URL url = Thread.currentThread().getContextClassLoader() - .getResource(allocFilePath); - if (url == null) { - LOG.warn(allocFilePath + " not found on the classpath."); - allocFile = null; - } else if (!url.getProtocol().equalsIgnoreCase("file")) { - throw new RuntimeException("Allocation file " + url - + " found on the classpath is not on the local filesystem."); - } else { - allocFile = new File(url.getPath()); - } - } - return allocFile; - } - public String getEventlogDir() { return get(EVENT_LOG_DIR, new File(System.getProperty("hadoop.log.dir", "/tmp/")).getAbsolutePath() + File.separator + "fairscheduler"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java index e601086b8c4..35bca321f78 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java @@ -33,15 +33,15 @@ import com.google.common.collect.ListMultimap; * constraints */ public class MaxRunningAppsEnforcer { - private final QueueManager queueMgr; + private final FairScheduler scheduler; // Tracks the number of running applications by user. private final Map usersNumRunnableApps; @VisibleForTesting final ListMultimap usersNonRunnableApps; - public MaxRunningAppsEnforcer(QueueManager queueMgr) { - this.queueMgr = queueMgr; + public MaxRunningAppsEnforcer(FairScheduler scheduler) { + this.scheduler = scheduler; this.usersNumRunnableApps = new HashMap(); this.usersNonRunnableApps = ArrayListMultimap.create(); } @@ -51,16 +51,17 @@ public class MaxRunningAppsEnforcer { * maxRunningApps limits. */ public boolean canAppBeRunnable(FSQueue queue, String user) { + AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); Integer userNumRunnable = usersNumRunnableApps.get(user); if (userNumRunnable == null) { userNumRunnable = 0; } - if (userNumRunnable >= queueMgr.getUserMaxApps(user)) { + if (userNumRunnable >= allocConf.getUserMaxApps(user)) { return false; } // Check queue and all parent queues while (queue != null) { - int queueMaxApps = queueMgr.getQueueMaxApps(queue.getName()); + int queueMaxApps = allocConf.getQueueMaxApps(queue.getName()); if (queue.getNumRunnableApps() >= queueMaxApps) { return false; } @@ -107,6 +108,8 @@ public class MaxRunningAppsEnforcer { * highest queue that went from having no slack to having slack. */ public void updateRunnabilityOnAppRemoval(FSSchedulerApp app) { + AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); + // Update usersRunnableApps String user = app.getUser(); int newUserNumRunning = usersNumRunnableApps.get(user) - 1; @@ -127,10 +130,10 @@ public class MaxRunningAppsEnforcer { // that was at its maxRunningApps before the removal. FSLeafQueue queue = app.getQueue(); FSQueue highestQueueWithAppsNowRunnable = (queue.getNumRunnableApps() == - queueMgr.getQueueMaxApps(queue.getName()) - 1) ? queue : null; + allocConf.getQueueMaxApps(queue.getName()) - 1) ? queue : null; FSParentQueue parent = queue.getParent(); while (parent != null) { - if (parent.getNumRunnableApps() == queueMgr.getQueueMaxApps(parent + if (parent.getNumRunnableApps() == allocConf.getQueueMaxApps(parent .getName())) { highestQueueWithAppsNowRunnable = parent; } @@ -149,7 +152,7 @@ public class MaxRunningAppsEnforcer { gatherPossiblyRunnableAppLists(highestQueueWithAppsNowRunnable, appsNowMaybeRunnable); } - if (newUserNumRunning == queueMgr.getUserMaxApps(user) - 1) { + if (newUserNumRunning == allocConf.getUserMaxApps(user) - 1) { List userWaitingApps = usersNonRunnableApps.get(user); if (userWaitingApps != null) { appsNowMaybeRunnable.add(userWaitingApps); @@ -200,7 +203,8 @@ public class MaxRunningAppsEnforcer { */ private void gatherPossiblyRunnableAppLists(FSQueue queue, List> appLists) { - if (queue.getNumRunnableApps() < queueMgr.getQueueMaxApps(queue.getName())) { + if (queue.getNumRunnableApps() < scheduler.getAllocationConfiguration() + .getQueueMaxApps(queue.getName())) { if (queue instanceof FSLeafQueue) { appLists.add(((FSLeafQueue)queue).getNonRunnableAppSchedulables()); } else { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index 95dfa4aff6e..38c338a399c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -18,20 +18,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; -import java.io.File; import java.io.IOException; -import java.net.URL; -import java.net.URLConnection; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.ParserConfigurationException; import org.apache.commons.logging.Log; @@ -39,21 +33,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.authorize.AccessControlList; -import org.apache.hadoop.yarn.api.records.QueueACL; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; -import org.apache.hadoop.yarn.util.resource.Resources; -import org.w3c.dom.Document; -import org.w3c.dom.Element; -import org.w3c.dom.Node; -import org.w3c.dom.NodeList; -import org.w3c.dom.Text; import org.xml.sax.SAXException; -import com.google.common.annotations.VisibleForTesting; - /** * Maintains a list of queues as well as scheduling parameters for each queue, * such as guaranteed share allocations, from the fair scheduler config file. @@ -67,37 +49,13 @@ public class QueueManager { public static final String ROOT_QUEUE = "root"; - /** Time to wait between checks of the allocation file */ - public static final long ALLOC_RELOAD_INTERVAL = 10 * 1000; - - /** - * Time to wait after the allocation has been modified before reloading it - * (this is done to prevent loading a file that hasn't been fully written). - */ - public static final long ALLOC_RELOAD_WAIT = 5 * 1000; - - private static final AccessControlList EVERYBODY_ACL = new AccessControlList("*"); - private static final AccessControlList NOBODY_ACL = new AccessControlList(" "); - private final FairScheduler scheduler; - // Path to XML file containing allocations. - private File allocFile; - private final Collection leafQueues = new CopyOnWriteArrayList(); private final Map queues = new HashMap(); private FSParentQueue rootQueue; - @VisibleForTesting - volatile QueueManagerInfo info = new QueueManagerInfo(); - @VisibleForTesting - volatile QueuePlacementPolicy placementPolicy; - - private long lastReloadAttempt; // Last time we tried to reload the queues file - private long lastSuccessfulReload; // Last time we successfully reloaded queues - private boolean lastReloadAttemptFailed = false; - public QueueManager(FairScheduler scheduler) { this.scheduler = scheduler; } @@ -106,45 +64,15 @@ public class QueueManager { return rootQueue; } - public void initialize() throws IOException, SAXException, - AllocationConfigurationException, ParserConfigurationException { - FairSchedulerConfiguration conf = scheduler.getConf(); - rootQueue = new FSParentQueue("root", this, scheduler, null); + public void initialize(Configuration conf) throws IOException, + SAXException, AllocationConfigurationException, ParserConfigurationException { + rootQueue = new FSParentQueue("root", scheduler, null); queues.put(rootQueue.getName(), rootQueue); - this.allocFile = conf.getAllocationFile(); - placementPolicy = new QueuePlacementPolicy(getSimplePlacementRules(), - new HashSet(), conf); - - reloadAllocs(); - lastSuccessfulReload = scheduler.getClock().getTime(); - lastReloadAttempt = scheduler.getClock().getTime(); // Create the default queue getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true); } - public void updatePlacementPolicy(FairSchedulerConfiguration conf) { - - } - - /** - * Construct simple queue placement policy from allow-undeclared-pools and - * user-as-default-queue. - */ - private List getSimplePlacementRules() { - boolean create = scheduler.getConf().getAllowUndeclaredPools(); - boolean userAsDefaultQueue = scheduler.getConf().getUserAsDefaultQueue(); - List rules = new ArrayList(); - rules.add(new QueuePlacementRule.Specified().initialize(create, null)); - if (userAsDefaultQueue) { - rules.add(new QueuePlacementRule.User().initialize(create, null)); - } - if (!userAsDefaultQueue || !create) { - rules.add(new QueuePlacementRule.Default().initialize(true, null)); - } - return rules; - } - /** * Get a queue by name, creating it if the create param is true and is necessary. * If the queue is not or can not be a leaf queue, i.e. it already exists as a @@ -213,17 +141,30 @@ public class QueueManager { // queue to create. // Now that we know everything worked out, make all the queues // and add them to the map. + AllocationConfiguration queueConf = scheduler.getAllocationConfiguration(); FSLeafQueue leafQueue = null; for (int i = newQueueNames.size()-1; i >= 0; i--) { String queueName = newQueueNames.get(i); if (i == 0) { // First name added was the leaf queue - leafQueue = new FSLeafQueue(name, this, scheduler, parent); + leafQueue = new FSLeafQueue(name, scheduler, parent); + try { + leafQueue.setPolicy(queueConf.getDefaultSchedulingPolicy()); + } catch (AllocationConfigurationException ex) { + LOG.warn("Failed to set default scheduling policy " + + queueConf.getDefaultSchedulingPolicy() + " on new leaf queue.", ex); + } parent.addChildQueue(leafQueue); queues.put(leafQueue.getName(), leafQueue); leafQueues.add(leafQueue); } else { - FSParentQueue newParent = new FSParentQueue(queueName, this, scheduler, parent); + FSParentQueue newParent = new FSParentQueue(queueName, scheduler, parent); + try { + newParent.setPolicy(queueConf.getDefaultSchedulingPolicy()); + } catch (AllocationConfigurationException ex) { + LOG.warn("Failed to set default scheduling policy " + + queueConf.getDefaultSchedulingPolicy() + " on new parent queue.", ex); + } parent.addChildQueue(newParent); queues.put(newParent.getName(), newParent); parent = newParent; @@ -257,301 +198,6 @@ public class QueueManager { } } - public QueuePlacementPolicy getPlacementPolicy() { - return placementPolicy; - } - - /** - * Reload allocations file if it hasn't been loaded in a while - */ - public void reloadAllocsIfNecessary() { - long time = scheduler.getClock().getTime(); - if (time > lastReloadAttempt + ALLOC_RELOAD_INTERVAL) { - lastReloadAttempt = time; - if (null == allocFile) { - return; - } - try { - // Get last modified time of alloc file depending whether it's a String - // (for a path name) or an URL (for a classloader resource) - long lastModified = allocFile.lastModified(); - if (lastModified > lastSuccessfulReload && - time > lastModified + ALLOC_RELOAD_WAIT) { - reloadAllocs(); - lastSuccessfulReload = time; - lastReloadAttemptFailed = false; - } - } catch (Exception e) { - // Throwing the error further out here won't help - the RPC thread - // will catch it and report it in a loop. Instead, just log it and - // hope somebody will notice from the log. - // We log the error only on the first failure so we don't fill up the - // JobTracker's log with these messages. - if (!lastReloadAttemptFailed) { - LOG.error("Failed to reload fair scheduler config file - " + - "will use existing allocations.", e); - } - lastReloadAttemptFailed = true; - } - } - } - - /** - * Updates the allocation list from the allocation config file. This file is - * expected to be in the XML format specified in the design doc. - * - * @throws IOException if the config file cannot be read. - * @throws AllocationConfigurationException if allocations are invalid. - * @throws ParserConfigurationException if XML parser is misconfigured. - * @throws SAXException if config file is malformed. - */ - public void reloadAllocs() throws IOException, ParserConfigurationException, - SAXException, AllocationConfigurationException { - if (allocFile == null) return; - // Create some temporary hashmaps to hold the new allocs, and we only save - // them in our fields if we have parsed the entire allocs file successfully. - Map minQueueResources = new HashMap(); - Map maxQueueResources = new HashMap(); - Map queueMaxApps = new HashMap(); - Map userMaxApps = new HashMap(); - Map queueWeights = new HashMap(); - Map queuePolicies = new HashMap(); - Map minSharePreemptionTimeouts = new HashMap(); - Map> queueAcls = - new HashMap>(); - int userMaxAppsDefault = Integer.MAX_VALUE; - int queueMaxAppsDefault = Integer.MAX_VALUE; - long fairSharePreemptionTimeout = Long.MAX_VALUE; - long defaultMinSharePreemptionTimeout = Long.MAX_VALUE; - SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.getDefault(); - - QueuePlacementPolicy newPlacementPolicy = null; - - // Remember all queue names so we can display them on web UI, etc. - List queueNamesInAllocFile = new ArrayList(); - - // Read and parse the allocations file. - DocumentBuilderFactory docBuilderFactory = - DocumentBuilderFactory.newInstance(); - docBuilderFactory.setIgnoringComments(true); - DocumentBuilder builder = docBuilderFactory.newDocumentBuilder(); - Document doc = builder.parse(allocFile); - Element root = doc.getDocumentElement(); - if (!"allocations".equals(root.getTagName())) - throw new AllocationConfigurationException("Bad fair scheduler config " + - "file: top-level element not "); - NodeList elements = root.getChildNodes(); - List queueElements = new ArrayList(); - Element placementPolicyElement = null; - for (int i = 0; i < elements.getLength(); i++) { - Node node = elements.item(i); - if (node instanceof Element) { - Element element = (Element)node; - if ("queue".equals(element.getTagName()) || - "pool".equals(element.getTagName())) { - queueElements.add(element); - } else if ("user".equals(element.getTagName())) { - String userName = element.getAttribute("name"); - NodeList fields = element.getChildNodes(); - for (int j = 0; j < fields.getLength(); j++) { - Node fieldNode = fields.item(j); - if (!(fieldNode instanceof Element)) - continue; - Element field = (Element) fieldNode; - if ("maxRunningApps".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData().trim(); - int val = Integer.parseInt(text); - userMaxApps.put(userName, val); - } - } - } else if ("userMaxAppsDefault".equals(element.getTagName())) { - String text = ((Text)element.getFirstChild()).getData().trim(); - int val = Integer.parseInt(text); - userMaxAppsDefault = val; - } else if ("fairSharePreemptionTimeout".equals(element.getTagName())) { - String text = ((Text)element.getFirstChild()).getData().trim(); - long val = Long.parseLong(text) * 1000L; - fairSharePreemptionTimeout = val; - } else if ("defaultMinSharePreemptionTimeout".equals(element.getTagName())) { - String text = ((Text)element.getFirstChild()).getData().trim(); - long val = Long.parseLong(text) * 1000L; - defaultMinSharePreemptionTimeout = val; - } else if ("queueMaxAppsDefault".equals(element.getTagName())) { - String text = ((Text)element.getFirstChild()).getData().trim(); - int val = Integer.parseInt(text); - queueMaxAppsDefault = val; - } else if ("defaultQueueSchedulingPolicy".equals(element.getTagName()) - || "defaultQueueSchedulingMode".equals(element.getTagName())) { - String text = ((Text)element.getFirstChild()).getData().trim(); - SchedulingPolicy.setDefault(text); - defaultSchedPolicy = SchedulingPolicy.getDefault(); - } else if ("queuePlacementPolicy".equals(element.getTagName())) { - placementPolicyElement = element; - } else { - LOG.warn("Bad element in allocations file: " + element.getTagName()); - } - } - } - - // Load queue elements. A root queue can either be included or omitted. If - // it's included, all other queues must be inside it. - for (Element element : queueElements) { - String parent = "root"; - if (element.getAttribute("name").equalsIgnoreCase("root")) { - if (queueElements.size() > 1) { - throw new AllocationConfigurationException("If configuring root queue," - + " no other queues can be placed alongside it."); - } - parent = null; - } - loadQueue(parent, element, minQueueResources, maxQueueResources, queueMaxApps, - userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts, - queueAcls, queueNamesInAllocFile); - } - - // Load placement policy and pass it configured queues - if (placementPolicyElement != null) { - newPlacementPolicy = QueuePlacementPolicy.fromXml(placementPolicyElement, - new HashSet(queueNamesInAllocFile), scheduler.getConf()); - } else { - newPlacementPolicy = new QueuePlacementPolicy(getSimplePlacementRules(), - new HashSet(queueNamesInAllocFile), scheduler.getConf()); - } - - // Commit the reload; also create any queue defined in the alloc file - // if it does not already exist, so it can be displayed on the web UI. - synchronized (this) { - info = new QueueManagerInfo(minQueueResources, maxQueueResources, - queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault, - queueMaxAppsDefault, defaultSchedPolicy, minSharePreemptionTimeouts, - queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout); - placementPolicy = newPlacementPolicy; - - // Make sure all queues exist - for (String name: queueNamesInAllocFile) { - getLeafQueue(name, true); - } - - for (FSQueue queue : queues.values()) { - // Update queue metrics - FSQueueMetrics queueMetrics = queue.getMetrics(); - queueMetrics.setMinShare(queue.getMinShare()); - queueMetrics.setMaxShare(queue.getMaxShare()); - // Set scheduling policies - if (queuePolicies.containsKey(queue.getName())) { - queue.setPolicy(queuePolicies.get(queue.getName())); - } else { - queue.setPolicy(SchedulingPolicy.getDefault()); - } - } - - } - } - - /** - * Loads a queue from a queue element in the configuration file - */ - private void loadQueue(String parentName, Element element, Map minQueueResources, - Map maxQueueResources, Map queueMaxApps, - Map userMaxApps, Map queueWeights, - Map queuePolicies, - Map minSharePreemptionTimeouts, - Map> queueAcls, List queueNamesInAllocFile) - throws AllocationConfigurationException { - String queueName = element.getAttribute("name"); - if (parentName != null) { - queueName = parentName + "." + queueName; - } - Map acls = - new HashMap(); - NodeList fields = element.getChildNodes(); - boolean isLeaf = true; - - for (int j = 0; j < fields.getLength(); j++) { - Node fieldNode = fields.item(j); - if (!(fieldNode instanceof Element)) - continue; - Element field = (Element) fieldNode; - if ("minResources".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData().trim(); - Resource val = FairSchedulerConfiguration.parseResourceConfigValue(text); - minQueueResources.put(queueName, val); - } else if ("maxResources".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData().trim(); - Resource val = FairSchedulerConfiguration.parseResourceConfigValue(text); - maxQueueResources.put(queueName, val); - } else if ("maxRunningApps".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData().trim(); - int val = Integer.parseInt(text); - queueMaxApps.put(queueName, val); - } else if ("weight".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData().trim(); - double val = Double.parseDouble(text); - queueWeights.put(queueName, new ResourceWeights((float)val)); - } else if ("minSharePreemptionTimeout".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData().trim(); - long val = Long.parseLong(text) * 1000L; - minSharePreemptionTimeouts.put(queueName, val); - } else if ("schedulingPolicy".equals(field.getTagName()) - || "schedulingMode".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData().trim(); - SchedulingPolicy policy = SchedulingPolicy.parse(text); - policy.initialize(scheduler.getClusterCapacity()); - queuePolicies.put(queueName, policy); - } else if ("aclSubmitApps".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData(); - acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text)); - } else if ("aclAdministerApps".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData(); - acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text)); - } else if ("queue".endsWith(field.getTagName()) || - "pool".equals(field.getTagName())) { - loadQueue(queueName, field, minQueueResources, maxQueueResources, - queueMaxApps, userMaxApps, queueWeights, queuePolicies, - minSharePreemptionTimeouts, - queueAcls, queueNamesInAllocFile); - isLeaf = false; - } - } - if (isLeaf) { - queueNamesInAllocFile.add(queueName); - } - queueAcls.put(queueName, acls); - if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName) - && !Resources.fitsIn(minQueueResources.get(queueName), - maxQueueResources.get(queueName))) { - LOG.warn(String.format("Queue %s has max resources %d less than min resources %d", - queueName, maxQueueResources.get(queueName), minQueueResources.get(queueName))); - } - } - - /** - * Get the minimum resource allocation for the given queue. - * @return the cap set on this queue, or 0 if not set. - */ - public Resource getMinResources(String queue) { - Resource minQueueResource = info.minQueueResources.get(queue); - if (minQueueResource != null) { - return minQueueResource; - } else { - return Resources.createResource(0); - } - } - - /** - * Get the maximum resource allocation for the given queue. - * @return the cap set on this queue, or Integer.MAX_VALUE if not set. - */ - - public Resource getMaxResources(String queueName) { - Resource maxQueueResource = info.maxQueueResources.get(queueName); - if (maxQueueResource != null) { - return maxQueueResource; - } else { - return Resources.createResource(Integer.MAX_VALUE, Integer.MAX_VALUE); - } - } - /** * Get a collection of all leaf queues */ @@ -567,141 +213,27 @@ public class QueueManager { public Collection getQueues() { return queues.values(); } - - public int getUserMaxApps(String user) { - // save current info in case it gets changed under us - QueueManagerInfo info = this.info; - if (info.userMaxApps.containsKey(user)) { - return info.userMaxApps.get(user); - } else { - return info.userMaxAppsDefault; - } - } - - public int getQueueMaxApps(String queue) { - // save current info in case it gets changed under us - QueueManagerInfo info = this.info; - if (info.queueMaxApps.containsKey(queue)) { - return info.queueMaxApps.get(queue); - } else { - return info.queueMaxAppsDefault; - } - } - public ResourceWeights getQueueWeight(String queue) { - ResourceWeights weight = info.queueWeights.get(queue); - if (weight != null) { - return weight; - } else { - return ResourceWeights.NEUTRAL; - } - } - - /** - * Get a queue's min share preemption timeout, in milliseconds. This is the - * time after which jobs in the queue may kill other queues' tasks if they - * are below their min share. - */ - public long getMinSharePreemptionTimeout(String queueName) { - // save current info in case it gets changed under us - QueueManagerInfo info = this.info; - if (info.minSharePreemptionTimeouts.containsKey(queueName)) { - return info.minSharePreemptionTimeouts.get(queueName); - } - return info.defaultMinSharePreemptionTimeout; - } - - /** - * Get the fair share preemption, in milliseconds. This is the time - * after which any job may kill other jobs' tasks if it is below half - * its fair share. - */ - public long getFairSharePreemptionTimeout() { - return info.fairSharePreemptionTimeout; - } - - /** - * Get the ACLs associated with this queue. If a given ACL is not explicitly - * configured, include the default value for that ACL. The default for the - * root queue is everybody ("*") and the default for all other queues is - * nobody ("") - */ - public AccessControlList getQueueAcl(String queue, QueueACL operation) { - Map queueAcls = info.queueAcls.get(queue); - if (queueAcls == null || !queueAcls.containsKey(operation)) { - return (queue.equals(ROOT_QUEUE)) ? EVERYBODY_ACL : NOBODY_ACL; - } - return queueAcls.get(operation); - } - - static class QueueManagerInfo { - // Minimum resource allocation for each queue - public final Map minQueueResources; - // Maximum amount of resources per queue - public final Map maxQueueResources; - // Sharing weights for each queue - public final Map queueWeights; - - // Max concurrent running applications for each queue and for each user; in addition, - // for users that have no max specified, we use the userMaxJobsDefault. - public final Map queueMaxApps; - public final Map userMaxApps; - public final int userMaxAppsDefault; - public final int queueMaxAppsDefault; - - // ACL's for each queue. Only specifies non-default ACL's from configuration. - public final Map> queueAcls; - - // Min share preemption timeout for each queue in seconds. If a job in the queue - // waits this long without receiving its guaranteed share, it is allowed to - // preempt other jobs' tasks. - public final Map minSharePreemptionTimeouts; - - // Default min share preemption timeout for queues where it is not set - // explicitly. - public final long defaultMinSharePreemptionTimeout; - - // Preemption timeout for jobs below fair share in seconds. If a job remains - // below half its fair share for this long, it is allowed to preempt tasks. - public final long fairSharePreemptionTimeout; - - public final SchedulingPolicy defaultSchedulingPolicy; - - public QueueManagerInfo(Map minQueueResources, - Map maxQueueResources, - Map queueMaxApps, Map userMaxApps, - Map queueWeights, int userMaxAppsDefault, - int queueMaxAppsDefault, SchedulingPolicy defaultSchedulingPolicy, - Map minSharePreemptionTimeouts, - Map> queueAcls, - long fairSharePreemptionTimeout, long defaultMinSharePreemptionTimeout) { - this.minQueueResources = minQueueResources; - this.maxQueueResources = maxQueueResources; - this.queueMaxApps = queueMaxApps; - this.userMaxApps = userMaxApps; - this.queueWeights = queueWeights; - this.userMaxAppsDefault = userMaxAppsDefault; - this.queueMaxAppsDefault = queueMaxAppsDefault; - this.defaultSchedulingPolicy = defaultSchedulingPolicy; - this.minSharePreemptionTimeouts = minSharePreemptionTimeouts; - this.queueAcls = queueAcls; - this.fairSharePreemptionTimeout = fairSharePreemptionTimeout; - this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout; + public void updateAllocationConfiguration(AllocationConfiguration queueConf) { + // Make sure all queues exist + for (String name : queueConf.getQueueNames()) { + getLeafQueue(name, true); } - public QueueManagerInfo() { - minQueueResources = new HashMap(); - maxQueueResources = new HashMap(); - queueWeights = new HashMap(); - queueMaxApps = new HashMap(); - userMaxApps = new HashMap(); - userMaxAppsDefault = Integer.MAX_VALUE; - queueMaxAppsDefault = Integer.MAX_VALUE; - queueAcls = new HashMap>(); - minSharePreemptionTimeouts = new HashMap(); - defaultMinSharePreemptionTimeout = Long.MAX_VALUE; - fairSharePreemptionTimeout = Long.MAX_VALUE; - defaultSchedulingPolicy = SchedulingPolicy.getDefault(); + for (FSQueue queue : queues.values()) { + // Update queue metrics + FSQueueMetrics queueMetrics = queue.getMetrics(); + queueMetrics.setMinShare(queue.getMinShare()); + queueMetrics.setMaxShare(queue.getMaxShare()); + // Set scheduling policies + try { + SchedulingPolicy policy = queueConf.getSchedulingPolicy(queue.getName()); + policy.initialize(scheduler.getClusterCapacity()); + queue.setPolicy(policy); + } catch (AllocationConfigurationException ex) { + LOG.warn("Cannot apply configured scheduling policy to queue " + + queue.getName(), ex); + } } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java index 4bf6b613166..d802e709644 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java @@ -94,6 +94,34 @@ public class QueuePlacementPolicy { return new QueuePlacementPolicy(rules, configuredQueues, conf); } + /** + * Build a simple queue placement policy from the allow-undeclared-pools and + * user-as-default-queue configuration options. + */ + public static QueuePlacementPolicy fromConfiguration(Configuration conf, + Set configuredQueues) { + boolean create = conf.getBoolean( + FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, + FairSchedulerConfiguration.DEFAULT_ALLOW_UNDECLARED_POOLS); + boolean userAsDefaultQueue = conf.getBoolean( + FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, + FairSchedulerConfiguration.DEFAULT_USER_AS_DEFAULT_QUEUE); + List rules = new ArrayList(); + rules.add(new QueuePlacementRule.Specified().initialize(create, null)); + if (userAsDefaultQueue) { + rules.add(new QueuePlacementRule.User().initialize(create, null)); + } + if (!userAsDefaultQueue || !create) { + rules.add(new QueuePlacementRule.Default().initialize(true, null)); + } + try { + return new QueuePlacementPolicy(rules, configuredQueues, conf); + } catch (AllocationConfigurationException ex) { + throw new RuntimeException("Should never hit exception when loading" + + "placement policy from conf", ex); + } + } + /** * Applies this rule to an app with the given requested queue and user/group * information. @@ -120,4 +148,8 @@ public class QueuePlacementPolicy { throw new IllegalStateException("Should have applied a rule before " + "reaching here"); } + + public List getRules() { + return rules; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java index 06f384045e0..549b85c380f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java @@ -35,7 +35,7 @@ public abstract class SchedulingPolicy { private static final ConcurrentHashMap, SchedulingPolicy> instances = new ConcurrentHashMap, SchedulingPolicy>(); - private static SchedulingPolicy DEFAULT_POLICY = + public static final SchedulingPolicy DEFAULT_POLICY = getInstance(FairSharePolicy.class); public static final byte DEPTH_LEAF = (byte) 1; @@ -44,15 +44,6 @@ public abstract class SchedulingPolicy { public static final byte DEPTH_PARENT = (byte) 6; // Root and Intermediate public static final byte DEPTH_ANY = (byte) 7; - public static SchedulingPolicy getDefault() { - return DEFAULT_POLICY; - } - - public static void setDefault(String className) - throws AllocationConfigurationException { - DEFAULT_POLICY = parse(className); - } - /** * Returns a {@link SchedulingPolicy} instance corresponding to the passed clazz */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java index fc4732d6bfe..1c5a79bed91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java @@ -29,10 +29,10 @@ import javax.xml.bind.annotation.XmlSeeAlso; import javax.xml.bind.annotation.XmlTransient; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueueManager; import org.apache.hadoop.yarn.util.resource.Resources; @XmlRootElement @@ -65,7 +65,7 @@ public class FairSchedulerQueueInfo { } public FairSchedulerQueueInfo(FSQueue queue, FairScheduler scheduler) { - QueueManager manager = scheduler.getQueueManager(); + AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); queueName = queue.getName(); schedulingPolicy = queue.getPolicy().getName(); @@ -87,7 +87,7 @@ public class FairSchedulerQueueInfo { fractionMemMinShare = (float)minResources.getMemory() / clusterResources.getMemory(); fractionMemMaxShare = (float)maxResources.getMemory() / clusterResources.getMemory(); - maxApps = manager.getQueueMaxApps(queueName); + maxApps = allocConf.getQueueMaxApps(queueName); Collection children = queue.getChildQueues(); childQueues = new ArrayList(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java new file mode 100644 index 00000000000..dc28a3b3ff4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java @@ -0,0 +1,432 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import static junit.framework.Assert.*; +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.FileWriter; +import java.io.PrintWriter; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Test; + +public class TestAllocationFileLoaderService { + + final static String TEST_DIR = new File(System.getProperty("test.build.data", + "/tmp")).getAbsolutePath(); + + final static String ALLOC_FILE = new File(TEST_DIR, + "test-queues").getAbsolutePath(); + + private class MockClock implements Clock { + private long time = 0; + @Override + public long getTime() { + return time; + } + + public void tick(long ms) { + time += ms; + } + } + + @Test + public void testGetAllocationFileFromClasspath() { + Configuration conf = new Configuration(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, + "test-fair-scheduler.xml"); + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); + File allocationFile = allocLoader.getAllocationFile(conf); + assertEquals("test-fair-scheduler.xml", allocationFile.getName()); + assertTrue(allocationFile.exists()); + } + + @Test (timeout = 10000) + public void testReload() throws Exception { + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(" "); + out.println(" 1"); + out.println(" "); + out.println(" "); + out.println(" "); + out.println(" "); + out.println(" "); + out.println(""); + out.close(); + + MockClock clock = new MockClock(); + Configuration conf = new Configuration(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService( + clock); + allocLoader.reloadIntervalMs = 5; + allocLoader.init(conf); + ReloadListener confHolder = new ReloadListener(); + allocLoader.setReloadListener(confHolder); + allocLoader.reloadAllocations(); + AllocationConfiguration allocConf = confHolder.allocConf; + + // Verify conf + QueuePlacementPolicy policy = allocConf.getPlacementPolicy(); + List rules = policy.getRules(); + assertEquals(1, rules.size()); + assertEquals(QueuePlacementRule.Default.class, rules.get(0).getClass()); + assertEquals(1, allocConf.getQueueMaxApps("root.queueA")); + assertEquals(2, allocConf.getQueueNames().size()); + assertTrue(allocConf.getQueueNames().contains("root.queueA")); + assertTrue(allocConf.getQueueNames().contains("root.queueB")); + + confHolder.allocConf = null; + + // Modify file and advance the clock + out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(" "); + out.println(" 3"); + out.println(" "); + out.println(" "); + out.println(" "); + out.println(" "); + out.println(" "); + out.println(""); + out.close(); + + clock.tick(System.currentTimeMillis() + + AllocationFileLoaderService.ALLOC_RELOAD_WAIT_MS + 10000); + allocLoader.start(); + + while (confHolder.allocConf == null) { + Thread.sleep(20); + } + + // Verify conf + allocConf = confHolder.allocConf; + policy = allocConf.getPlacementPolicy(); + rules = policy.getRules(); + assertEquals(2, rules.size()); + assertEquals(QueuePlacementRule.Specified.class, rules.get(0).getClass()); + assertEquals(QueuePlacementRule.Default.class, rules.get(1).getClass()); + assertEquals(3, allocConf.getQueueMaxApps("root.queueB")); + assertEquals(1, allocConf.getQueueNames().size()); + assertTrue(allocConf.getQueueNames().contains("root.queueB")); + } + + @Test + public void testAllocationFileParsing() throws Exception { + Configuration conf = new Configuration(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + // Give queue A a minimum of 1024 M + out.println(""); + out.println("1024mb,0vcores"); + out.println(""); + // Give queue B a minimum of 2048 M + out.println(""); + out.println("2048mb,0vcores"); + out.println("alice,bob admins"); + out.println("fair"); + out.println(""); + // Give queue C no minimum + out.println(""); + out.println("alice,bob admins"); + out.println(""); + // Give queue D a limit of 3 running apps + out.println(""); + out.println("3"); + out.println(""); + // Give queue E a preemption timeout of one minute + out.println(""); + out.println("60"); + out.println(""); + // Set default limit of apps per queue to 15 + out.println("15"); + // Set default limit of apps per user to 5 + out.println("5"); + // Give user1 a limit of 10 jobs + out.println(""); + out.println("10"); + out.println(""); + // Set default min share preemption timeout to 2 minutes + out.println("120" + + ""); + // Set fair share preemption timeout to 5 minutes + out.println("300"); + // Set default scheduling policy to DRF + out.println("drf"); + out.println(""); + out.close(); + + allocLoader.init(conf); + ReloadListener confHolder = new ReloadListener(); + allocLoader.setReloadListener(confHolder); + allocLoader.reloadAllocations(); + AllocationConfiguration queueConf = confHolder.allocConf; + + assertEquals(5, queueConf.getQueueNames().size()); + assertEquals(Resources.createResource(0), + queueConf.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); + assertEquals(Resources.createResource(0), + queueConf.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); + + assertEquals(Resources.createResource(1024, 0), + queueConf.getMinResources("root.queueA")); + assertEquals(Resources.createResource(2048, 0), + queueConf.getMinResources("root.queueB")); + assertEquals(Resources.createResource(0), + queueConf.getMinResources("root.queueC")); + assertEquals(Resources.createResource(0), + queueConf.getMinResources("root.queueD")); + assertEquals(Resources.createResource(0), + queueConf.getMinResources("root.queueE")); + + assertEquals(15, queueConf.getQueueMaxApps("root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); + assertEquals(15, queueConf.getQueueMaxApps("root.queueA")); + assertEquals(15, queueConf.getQueueMaxApps("root.queueB")); + assertEquals(15, queueConf.getQueueMaxApps("root.queueC")); + assertEquals(3, queueConf.getQueueMaxApps("root.queueD")); + assertEquals(15, queueConf.getQueueMaxApps("root.queueE")); + assertEquals(10, queueConf.getUserMaxApps("user1")); + assertEquals(5, queueConf.getUserMaxApps("user2")); + + // Root should get * ACL + assertEquals("*", queueConf.getQueueAcl("root", + QueueACL.ADMINISTER_QUEUE).getAclString()); + assertEquals("*", queueConf.getQueueAcl("root", + QueueACL.SUBMIT_APPLICATIONS).getAclString()); + + // Unspecified queues should get default ACL + assertEquals(" ", queueConf.getQueueAcl("root.queueA", + QueueACL.ADMINISTER_QUEUE).getAclString()); + assertEquals(" ", queueConf.getQueueAcl("root.queueA", + QueueACL.SUBMIT_APPLICATIONS).getAclString()); + + // Queue B ACL + assertEquals("alice,bob admins", queueConf.getQueueAcl("root.queueB", + QueueACL.ADMINISTER_QUEUE).getAclString()); + + // Queue C ACL + assertEquals("alice,bob admins", queueConf.getQueueAcl("root.queueC", + QueueACL.SUBMIT_APPLICATIONS).getAclString()); + + assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root." + + YarnConfiguration.DEFAULT_QUEUE_NAME)); + assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueA")); + assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueB")); + assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueC")); + assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueD")); + assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueA")); + assertEquals(60000, queueConf.getMinSharePreemptionTimeout("root.queueE")); + assertEquals(300000, queueConf.getFairSharePreemptionTimeout()); + + // Verify existing queues have default scheduling policy + assertEquals(DominantResourceFairnessPolicy.NAME, + queueConf.getSchedulingPolicy("root").getName()); + assertEquals(DominantResourceFairnessPolicy.NAME, + queueConf.getSchedulingPolicy("root.queueA").getName()); + // Verify default is overriden if specified explicitly + assertEquals(FairSharePolicy.NAME, + queueConf.getSchedulingPolicy("root.queueB").getName()); + // Verify new queue gets default scheduling policy + assertEquals(DominantResourceFairnessPolicy.NAME, + queueConf.getSchedulingPolicy("root.newqueue").getName()); + } + + @Test + public void testBackwardsCompatibleAllocationFileParsing() throws Exception { + Configuration conf = new Configuration(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + // Give queue A a minimum of 1024 M + out.println(""); + out.println("1024mb,0vcores"); + out.println(""); + // Give queue B a minimum of 2048 M + out.println(""); + out.println("2048mb,0vcores"); + out.println("alice,bob admins"); + out.println(""); + // Give queue C no minimum + out.println(""); + out.println("alice,bob admins"); + out.println(""); + // Give queue D a limit of 3 running apps + out.println(""); + out.println("3"); + out.println(""); + // Give queue E a preemption timeout of one minute + out.println(""); + out.println("60"); + out.println(""); + // Set default limit of apps per queue to 15 + out.println("15"); + // Set default limit of apps per user to 5 + out.println("5"); + // Give user1 a limit of 10 jobs + out.println(""); + out.println("10"); + out.println(""); + // Set default min share preemption timeout to 2 minutes + out.println("120" + + ""); + // Set fair share preemption timeout to 5 minutes + out.println("300"); + out.println(""); + out.close(); + + allocLoader.init(conf); + ReloadListener confHolder = new ReloadListener(); + allocLoader.setReloadListener(confHolder); + allocLoader.reloadAllocations(); + AllocationConfiguration queueConf = confHolder.allocConf; + + assertEquals(5, queueConf.getQueueNames().size()); + assertEquals(Resources.createResource(0), + queueConf.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); + assertEquals(Resources.createResource(0), + queueConf.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); + + assertEquals(Resources.createResource(1024, 0), + queueConf.getMinResources("root.queueA")); + assertEquals(Resources.createResource(2048, 0), + queueConf.getMinResources("root.queueB")); + assertEquals(Resources.createResource(0), + queueConf.getMinResources("root.queueC")); + assertEquals(Resources.createResource(0), + queueConf.getMinResources("root.queueD")); + assertEquals(Resources.createResource(0), + queueConf.getMinResources("root.queueE")); + + assertEquals(15, queueConf.getQueueMaxApps("root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); + assertEquals(15, queueConf.getQueueMaxApps("root.queueA")); + assertEquals(15, queueConf.getQueueMaxApps("root.queueB")); + assertEquals(15, queueConf.getQueueMaxApps("root.queueC")); + assertEquals(3, queueConf.getQueueMaxApps("root.queueD")); + assertEquals(15, queueConf.getQueueMaxApps("root.queueE")); + assertEquals(10, queueConf.getUserMaxApps("user1")); + assertEquals(5, queueConf.getUserMaxApps("user2")); + + // Unspecified queues should get default ACL + assertEquals(" ", queueConf.getQueueAcl("root.queueA", + QueueACL.ADMINISTER_QUEUE).getAclString()); + assertEquals(" ", queueConf.getQueueAcl("root.queueA", + QueueACL.SUBMIT_APPLICATIONS).getAclString()); + + // Queue B ACL + assertEquals("alice,bob admins", queueConf.getQueueAcl("root.queueB", + QueueACL.ADMINISTER_QUEUE).getAclString()); + + // Queue C ACL + assertEquals("alice,bob admins", queueConf.getQueueAcl("root.queueC", + QueueACL.SUBMIT_APPLICATIONS).getAclString()); + + + assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root." + + YarnConfiguration.DEFAULT_QUEUE_NAME)); + assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueA")); + assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueB")); + assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueC")); + assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueD")); + assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueA")); + assertEquals(60000, queueConf.getMinSharePreemptionTimeout("root.queueE")); + assertEquals(300000, queueConf.getFairSharePreemptionTimeout()); + } + + @Test + public void testSimplePlacementPolicyFromConf() throws Exception { + Configuration conf = new Configuration(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false); + conf.setBoolean(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, false); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.close(); + + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); + allocLoader.init(conf); + ReloadListener confHolder = new ReloadListener(); + allocLoader.setReloadListener(confHolder); + allocLoader.reloadAllocations(); + AllocationConfiguration allocConf = confHolder.allocConf; + + QueuePlacementPolicy placementPolicy = allocConf.getPlacementPolicy(); + List rules = placementPolicy.getRules(); + assertEquals(2, rules.size()); + assertEquals(QueuePlacementRule.Specified.class, rules.get(0).getClass()); + assertEquals(false, rules.get(0).create); + assertEquals(QueuePlacementRule.Default.class, rules.get(1).getClass()); + } + + /** + * Verify that you can't place queues at the same level as the root queue in + * the allocations file. + */ + @Test (expected = AllocationConfigurationException.class) + public void testQueueAlongsideRoot() throws Exception { + Configuration conf = new Configuration(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.close(); + + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); + allocLoader.init(conf); + ReloadListener confHolder = new ReloadListener(); + allocLoader.setReloadListener(confHolder); + allocLoader.reloadAllocations(); + } + + private class ReloadListener implements AllocationFileLoaderService.Listener { + public AllocationConfiguration allocConf; + + @Override + public void onReload(AllocationConfiguration info) { + allocConf = info; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java index 5bfb182ced3..e4dc8016c12 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java @@ -51,11 +51,11 @@ public class TestFSLeafQueue { scheduler.reinitialize(conf, resourceManager.getRMContext()); String queueName = "root.queue1"; - QueueManager mockMgr = mock(QueueManager.class); - when(mockMgr.getMaxResources(queueName)).thenReturn(maxResource); - when(mockMgr.getMinResources(queueName)).thenReturn(Resources.none()); + scheduler.allocConf = mock(AllocationConfiguration.class); + when(scheduler.allocConf.getMaxResources(queueName)).thenReturn(maxResource); + when(scheduler.allocConf.getMinResources(queueName)).thenReturn(Resources.none()); - schedulable = new FSLeafQueue(queueName, mockMgr, scheduler, null); + schedulable = new FSLeafQueue(queueName, scheduler, null); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index b659a9b9f79..f9c67b40ed5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -55,7 +55,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.api.records.NodeId; @@ -86,7 +85,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSc import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Clock; @@ -121,6 +119,7 @@ public class TestFairScheduler { private FairScheduler scheduler; private ResourceManager resourceManager; + private Configuration conf; private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private int APP_ID = 1; // Incrementing counter for schedling apps @@ -130,7 +129,7 @@ public class TestFairScheduler { @Before public void setUp() throws IOException { scheduler = new FairScheduler(); - Configuration conf = createConfiguration(); + conf = createConfiguration(); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0); conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1024); @@ -145,7 +144,6 @@ public class TestFairScheduler { ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start(); resourceManager.getRMContext().getStateStore().start(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); // to initialize the master key resourceManager.getRMContainerTokenSecretManager().rollMasterKey(); } @@ -291,7 +289,6 @@ public class TestFairScheduler { @Test(timeout=2000) public void testLoadConfigurationOnInitialize() throws IOException { - Configuration conf = createConfiguration(); conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true); conf.setInt(FairSchedulerConfiguration.MAX_ASSIGN, 3); conf.setBoolean(FairSchedulerConfiguration.SIZE_BASED_WEIGHT, true); @@ -362,6 +359,8 @@ public class TestFairScheduler { @Test public void testAggregateCapacityTracking() throws Exception { + scheduler.reinitialize(conf, resourceManager.getRMContext()); + // Add a node RMNode node1 = MockNodes @@ -384,7 +383,9 @@ public class TestFairScheduler { } @Test - public void testSimpleFairShareCalculation() { + public void testSimpleFairShareCalculation() throws IOException { + scheduler.reinitialize(conf, resourceManager.getRMContext()); + // Add one big node (only care about aggregate capacity) RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024), 1, @@ -409,7 +410,9 @@ public class TestFairScheduler { } @Test - public void testSimpleHierarchicalFairShareCalculation() { + public void testSimpleHierarchicalFairShareCalculation() throws IOException { + scheduler.reinitialize(conf, resourceManager.getRMContext()); + // Add one big node (only care about aggregate capacity) int capacity = 10 * 24; RMNode node1 = @@ -440,7 +443,9 @@ public class TestFairScheduler { } @Test - public void testHierarchicalQueuesSimilarParents() { + public void testHierarchicalQueuesSimilarParents() throws IOException { + scheduler.reinitialize(conf, resourceManager.getRMContext()); + QueueManager queueManager = scheduler.getQueueManager(); FSLeafQueue leafQueue = queueManager.getLeafQueue("parent.child", true); Assert.assertEquals(2, queueManager.getLeafQueues().size()); @@ -462,8 +467,9 @@ public class TestFairScheduler { } @Test - public void testSchedulerRootQueueMetrics() throws InterruptedException { - + public void testSchedulerRootQueueMetrics() throws Exception { + scheduler.reinitialize(conf, resourceManager.getRMContext()); + // Add a node RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024)); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); @@ -500,7 +506,9 @@ public class TestFairScheduler { } @Test (timeout = 5000) - public void testSimpleContainerAllocation() { + public void testSimpleContainerAllocation() throws IOException { + scheduler.reinitialize(conf, resourceManager.getRMContext()); + // Add a node RMNode node1 = MockNodes @@ -546,7 +554,9 @@ public class TestFairScheduler { } @Test (timeout = 5000) - public void testSimpleContainerReservation() throws InterruptedException { + public void testSimpleContainerReservation() throws Exception { + scheduler.reinitialize(conf, resourceManager.getRMContext()); + // Add a node RMNode node1 = MockNodes @@ -598,7 +608,6 @@ public class TestFairScheduler { @Test public void testUserAsDefaultQueue() throws Exception { - Configuration conf = createConfiguration(); conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true"); scheduler.reinitialize(conf, resourceManager.getRMContext()); RMContext rmContext = resourceManager.getRMContext(); @@ -617,14 +626,24 @@ public class TestFairScheduler { assertEquals(0, scheduler.getQueueManager().getLeafQueue("default", true) .getRunnableAppSchedulables().size()); assertEquals("root.user1", rmApp.getQueue()); - + } + + @Test + public void testNotUserAsDefaultQueue() throws Exception { conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false"); scheduler.reinitialize(conf, resourceManager.getRMContext()); - scheduler.getQueueManager().initialize(); + RMContext rmContext = resourceManager.getRMContext(); + Map appsMap = rmContext.getRMApps(); + ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1); + RMApp rmApp = new RMAppImpl(appAttemptId.getApplicationId(), rmContext, conf, + null, null, null, ApplicationSubmissionContext.newInstance(null, null, + null, null, null, false, false, 0, null, null), null, null, 0, null); + appsMap.put(appAttemptId.getApplicationId(), rmApp); + AppAddedSchedulerEvent appAddedEvent2 = new AppAddedSchedulerEvent( - createAppAttemptId(2, 1), "default", "user2"); + appAttemptId, "default", "user2"); scheduler.handle(appAddedEvent2); - assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true) + assertEquals(0, scheduler.getQueueManager().getLeafQueue("user1", true) .getRunnableAppSchedulables().size()); assertEquals(1, scheduler.getQueueManager().getLeafQueue("default", true) .getRunnableAppSchedulables().size()); @@ -634,7 +653,7 @@ public class TestFairScheduler { @Test public void testEmptyQueueName() throws Exception { - Configuration conf = createConfiguration(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); // only default queue assertEquals(1, scheduler.getQueueManager().getLeafQueues().size()); @@ -653,7 +672,6 @@ public class TestFairScheduler { @Test public void testAssignToQueue() throws Exception { - Configuration conf = createConfiguration(); conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true"); scheduler.reinitialize(conf, resourceManager.getRMContext()); @@ -672,9 +690,10 @@ public class TestFairScheduler { @Test public void testQueuePlacementWithPolicy() throws Exception { - Configuration conf = createConfiguration(); conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING, SimpleGroupsMapping.class, GroupMappingServiceProvider.class); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + ApplicationAttemptId appId; Map apps = scheduler.applications; @@ -684,10 +703,10 @@ public class TestFairScheduler { rules.add(new QueuePlacementRule.PrimaryGroup().initialize(false, null)); rules.add(new QueuePlacementRule.SecondaryGroupExistingQueue().initialize(false, null)); rules.add(new QueuePlacementRule.Default().initialize(true, null)); - Set queues = Sets.newHashSet("root.user1", "root.user3group", + Set queues = Sets.newHashSet("root.user1", "root.user3group", "root.user4subgroup1", "root.user4subgroup2" , "root.user5subgroup2"); - scheduler.getQueueManager().placementPolicy = new QueuePlacementPolicy( - rules, queues, conf); + scheduler.getAllocationConfiguration().placementPolicy = + new QueuePlacementPolicy(rules, queues, conf); appId = createSchedulingRequest(1024, "somequeue", "user1"); assertEquals("root.somequeue", apps.get(appId).getQueueName()); appId = createSchedulingRequest(1024, "default", "user1"); @@ -706,8 +725,8 @@ public class TestFairScheduler { rules.add(new QueuePlacementRule.User().initialize(false, null)); rules.add(new QueuePlacementRule.Specified().initialize(true, null)); rules.add(new QueuePlacementRule.Default().initialize(true, null)); - scheduler.getQueueManager().placementPolicy = new QueuePlacementPolicy( - rules, queues, conf); + scheduler.getAllocationConfiguration().placementPolicy = + new QueuePlacementPolicy(rules, queues, conf); appId = createSchedulingRequest(1024, "somequeue", "user1"); assertEquals("root.user1", apps.get(appId).getQueueName()); appId = createSchedulingRequest(1024, "somequeue", "otheruser"); @@ -718,9 +737,7 @@ public class TestFairScheduler { @Test public void testFairShareWithMinAlloc() throws Exception { - Configuration conf = createConfiguration(); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - scheduler.reinitialize(conf, resourceManager.getRMContext()); PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println(""); @@ -733,9 +750,8 @@ public class TestFairScheduler { out.println(""); out.println(""); out.close(); - - QueueManager queueManager = scheduler.getQueueManager(); - queueManager.initialize(); + + scheduler.reinitialize(conf, resourceManager.getRMContext()); // Add one big node (only care about aggregate capacity) RMNode node1 = @@ -767,6 +783,8 @@ public class TestFairScheduler { */ @Test public void testQueueDemandCalculation() throws Exception { + scheduler.reinitialize(conf, resourceManager.getRMContext()); + ApplicationAttemptId id11 = createAppAttemptId(1, 1); scheduler.addApplication(id11, "root.queue1", "user1"); ApplicationAttemptId id21 = createAppAttemptId(2, 1); @@ -812,6 +830,8 @@ public class TestFairScheduler { @Test public void testAppAdditionAndRemoval() throws Exception { + scheduler.reinitialize(conf, resourceManager.getRMContext()); + AppAddedSchedulerEvent appAddedEvent1 = new AppAddedSchedulerEvent( createAppAttemptId(1, 1), "default", "user1"); scheduler.handle(appAddedEvent1); @@ -834,133 +854,10 @@ public class TestFairScheduler { .getRunnableAppSchedulables().size()); } - @Test - public void testAllocationFileParsing() throws Exception { - Configuration conf = createConfiguration(); - conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - - PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); - out.println(""); - out.println(""); - // Give queue A a minimum of 1024 M - out.println(""); - out.println("1024mb,0vcores"); - out.println(""); - // Give queue B a minimum of 2048 M - out.println(""); - out.println("2048mb,0vcores"); - out.println("alice,bob admins"); - out.println("fair"); - out.println(""); - // Give queue C no minimum - out.println(""); - out.println("alice,bob admins"); - out.println(""); - // Give queue D a limit of 3 running apps - out.println(""); - out.println("3"); - out.println(""); - // Give queue E a preemption timeout of one minute - out.println(""); - out.println("60"); - out.println(""); - // Set default limit of apps per queue to 15 - out.println("15"); - // Set default limit of apps per user to 5 - out.println("5"); - // Give user1 a limit of 10 jobs - out.println(""); - out.println("10"); - out.println(""); - // Set default min share preemption timeout to 2 minutes - out.println("120" - + ""); - // Set fair share preemption timeout to 5 minutes - out.println("300"); - // Set default scheduling policy to DRF - out.println("drf"); - out.println(""); - out.close(); - - QueueManager queueManager = scheduler.getQueueManager(); - queueManager.initialize(); - - assertEquals(6, queueManager.getLeafQueues().size()); // 5 in file + default queue - assertEquals(Resources.createResource(0), - queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); - assertEquals(Resources.createResource(0), - queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); - - assertEquals(Resources.createResource(1024, 0), - queueManager.getMinResources("root.queueA")); - assertEquals(Resources.createResource(2048, 0), - queueManager.getMinResources("root.queueB")); - assertEquals(Resources.createResource(0), - queueManager.getMinResources("root.queueC")); - assertEquals(Resources.createResource(0), - queueManager.getMinResources("root.queueD")); - assertEquals(Resources.createResource(0), - queueManager.getMinResources("root.queueE")); - - assertEquals(15, queueManager.getQueueMaxApps("root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); - assertEquals(15, queueManager.getQueueMaxApps("root.queueA")); - assertEquals(15, queueManager.getQueueMaxApps("root.queueB")); - assertEquals(15, queueManager.getQueueMaxApps("root.queueC")); - assertEquals(3, queueManager.getQueueMaxApps("root.queueD")); - assertEquals(15, queueManager.getQueueMaxApps("root.queueE")); - assertEquals(10, queueManager.getUserMaxApps("user1")); - assertEquals(5, queueManager.getUserMaxApps("user2")); - - // Root should get * ACL - assertEquals("*",queueManager.getQueueAcl("root", - QueueACL.ADMINISTER_QUEUE).getAclString()); - assertEquals("*", queueManager.getQueueAcl("root", - QueueACL.SUBMIT_APPLICATIONS).getAclString()); - - // Unspecified queues should get default ACL - assertEquals(" ",queueManager.getQueueAcl("root.queueA", - QueueACL.ADMINISTER_QUEUE).getAclString()); - assertEquals(" ", queueManager.getQueueAcl("root.queueA", - QueueACL.SUBMIT_APPLICATIONS).getAclString()); - - // Queue B ACL - assertEquals("alice,bob admins",queueManager.getQueueAcl("root.queueB", - QueueACL.ADMINISTER_QUEUE).getAclString()); - - // Queue C ACL - assertEquals("alice,bob admins",queueManager.getQueueAcl("root.queueC", - QueueACL.SUBMIT_APPLICATIONS).getAclString()); - - assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root." + - YarnConfiguration.DEFAULT_QUEUE_NAME)); - assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueA")); - assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueB")); - assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueC")); - assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueD")); - assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueA")); - assertEquals(60000, queueManager.getMinSharePreemptionTimeout("root.queueE")); - assertEquals(300000, queueManager.getFairSharePreemptionTimeout()); - - // Verify existing queues have default scheduling policy - assertEquals(DominantResourceFairnessPolicy.NAME, - queueManager.getQueue("root").getPolicy().getName()); - assertEquals(DominantResourceFairnessPolicy.NAME, - queueManager.getQueue("root.queueA").getPolicy().getName()); - // Verify default is overriden if specified explicitly - assertEquals(FairSharePolicy.NAME, - queueManager.getQueue("root.queueB").getPolicy().getName()); - // Verify new queue gets default scheduling policy - assertEquals(DominantResourceFairnessPolicy.NAME, - queueManager.getLeafQueue("root.newqueue", true).getPolicy().getName()); - } - @Test public void testHierarchicalQueueAllocationFileParsing() throws IOException, SAXException, AllocationConfigurationException, ParserConfigurationException { - Configuration conf = createConfiguration(); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - scheduler.reinitialize(conf, resourceManager.getRMContext()); PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println(""); @@ -980,9 +877,9 @@ public class TestFairScheduler { out.println(""); out.close(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + QueueManager queueManager = scheduler.getQueueManager(); - queueManager.initialize(); - Collection leafQueues = queueManager.getLeafQueues(); Assert.assertEquals(4, leafQueues.size()); Assert.assertNotNull(queueManager.getLeafQueue("queueA", false)); @@ -995,9 +892,7 @@ public class TestFairScheduler { @Test public void testConfigureRootQueue() throws Exception { - Configuration conf = createConfiguration(); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - scheduler.reinitialize(conf, resourceManager.getRMContext()); PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println(""); @@ -1014,9 +909,9 @@ public class TestFairScheduler { out.println(""); out.println(""); out.close(); - + + scheduler.reinitialize(conf, resourceManager.getRMContext()); QueueManager queueManager = scheduler.getQueueManager(); - queueManager.initialize(); FSQueue root = queueManager.getRootQueue(); assertTrue(root.getPolicy() instanceof DominantResourceFairnessPolicy); @@ -1025,136 +920,9 @@ public class TestFairScheduler { assertNotNull(queueManager.getLeafQueue("child2", false)); } - /** - * Verify that you can't place queues at the same level as the root queue in - * the allocations file. - */ - @Test (expected = AllocationConfigurationException.class) - public void testQueueAlongsideRoot() throws Exception { - Configuration conf = createConfiguration(); - conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - - PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); - out.println(""); - out.println(""); - out.println(""); - out.println(""); - out.println(""); - out.println(""); - out.println(""); - out.close(); - - QueueManager queueManager = scheduler.getQueueManager(); - queueManager.initialize(); - } - - @Test - public void testBackwardsCompatibleAllocationFileParsing() throws Exception { - Configuration conf = createConfiguration(); - conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - - PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); - out.println(""); - out.println(""); - // Give queue A a minimum of 1024 M - out.println(""); - out.println("1024mb,0vcores"); - out.println(""); - // Give queue B a minimum of 2048 M - out.println(""); - out.println("2048mb,0vcores"); - out.println("alice,bob admins"); - out.println(""); - // Give queue C no minimum - out.println(""); - out.println("alice,bob admins"); - out.println(""); - // Give queue D a limit of 3 running apps - out.println(""); - out.println("3"); - out.println(""); - // Give queue E a preemption timeout of one minute - out.println(""); - out.println("60"); - out.println(""); - // Set default limit of apps per queue to 15 - out.println("15"); - // Set default limit of apps per user to 5 - out.println("5"); - // Give user1 a limit of 10 jobs - out.println(""); - out.println("10"); - out.println(""); - // Set default min share preemption timeout to 2 minutes - out.println("120" - + ""); - // Set fair share preemption timeout to 5 minutes - out.println("300"); - out.println(""); - out.close(); - - QueueManager queueManager = scheduler.getQueueManager(); - queueManager.initialize(); - - assertEquals(6, queueManager.getLeafQueues().size()); // 5 in file + default queue - assertEquals(Resources.createResource(0), - queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); - assertEquals(Resources.createResource(0), - queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); - - assertEquals(Resources.createResource(1024, 0), - queueManager.getMinResources("root.queueA")); - assertEquals(Resources.createResource(2048, 0), - queueManager.getMinResources("root.queueB")); - assertEquals(Resources.createResource(0), - queueManager.getMinResources("root.queueC")); - assertEquals(Resources.createResource(0), - queueManager.getMinResources("root.queueD")); - assertEquals(Resources.createResource(0), - queueManager.getMinResources("root.queueE")); - - assertEquals(15, queueManager.getQueueMaxApps("root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); - assertEquals(15, queueManager.getQueueMaxApps("root.queueA")); - assertEquals(15, queueManager.getQueueMaxApps("root.queueB")); - assertEquals(15, queueManager.getQueueMaxApps("root.queueC")); - assertEquals(3, queueManager.getQueueMaxApps("root.queueD")); - assertEquals(15, queueManager.getQueueMaxApps("root.queueE")); - assertEquals(10, queueManager.getUserMaxApps("user1")); - assertEquals(5, queueManager.getUserMaxApps("user2")); - - // Unspecified queues should get default ACL - assertEquals(" ", queueManager.getQueueAcl("root.queueA", - QueueACL.ADMINISTER_QUEUE).getAclString()); - assertEquals(" ", queueManager.getQueueAcl("root.queueA", - QueueACL.SUBMIT_APPLICATIONS).getAclString()); - - // Queue B ACL - assertEquals("alice,bob admins", queueManager.getQueueAcl("root.queueB", - QueueACL.ADMINISTER_QUEUE).getAclString()); - - // Queue C ACL - assertEquals("alice,bob admins", queueManager.getQueueAcl("root.queueC", - QueueACL.SUBMIT_APPLICATIONS).getAclString()); - - - assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root." + - YarnConfiguration.DEFAULT_QUEUE_NAME)); - assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueA")); - assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueB")); - assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueC")); - assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueD")); - assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueA")); - assertEquals(60000, queueManager.getMinSharePreemptionTimeout("root.queueE")); - assertEquals(300000, queueManager.getFairSharePreemptionTimeout()); - } - @Test (timeout = 5000) public void testIsStarvedForMinShare() throws Exception { - Configuration conf = createConfiguration(); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - scheduler.reinitialize(conf, resourceManager.getRMContext()); PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println(""); @@ -1168,8 +936,7 @@ public class TestFairScheduler { out.println(""); out.close(); - QueueManager queueManager = scheduler.getQueueManager(); - queueManager.initialize(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); // Add one big node (only care about aggregate capacity) RMNode node1 = @@ -1212,9 +979,7 @@ public class TestFairScheduler { @Test (timeout = 5000) public void testIsStarvedForFairShare() throws Exception { - Configuration conf = createConfiguration(); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - scheduler.reinitialize(conf, resourceManager.getRMContext()); PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println(""); @@ -1228,9 +993,8 @@ public class TestFairScheduler { out.println(""); out.close(); - QueueManager queueManager = scheduler.getQueueManager(); - queueManager.initialize(); - + scheduler.reinitialize(conf, resourceManager.getRMContext()); + // Add one big node (only care about aggregate capacity) RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1, @@ -1277,13 +1041,9 @@ public class TestFairScheduler { * now this means decreasing order of priority. */ public void testChoiceOfPreemptedContainers() throws Exception { - Configuration conf = createConfiguration(); - conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000); conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000); - conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE); - scheduler.reinitialize(conf, resourceManager.getRMContext()); MockClock clock = new MockClock(); scheduler.setClock(clock); @@ -1305,9 +1065,8 @@ public class TestFairScheduler { out.println(""); out.println(""); out.close(); - - QueueManager queueManager = scheduler.getQueueManager(); - queueManager.initialize(); + + scheduler.reinitialize(conf, resourceManager.getRMContext()); // Create four nodes RMNode node1 = @@ -1443,15 +1202,16 @@ public class TestFairScheduler { * Tests the timing of decision to preempt tasks. */ public void testPreemptionDecision() throws Exception { - Configuration conf = createConfiguration(); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); MockClock clock = new MockClock(); scheduler.setClock(clock); - scheduler.reinitialize(conf, resourceManager.getRMContext()); PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println(""); out.println(""); + out.println(""); + out.println("0mb,0vcores"); + out.println(""); out.println(""); out.println(".25"); out.println("1024mb,0vcores"); @@ -1473,8 +1233,7 @@ public class TestFairScheduler { out.println(""); out.close(); - QueueManager queueManager = scheduler.getQueueManager(); - queueManager.initialize(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); // Create four nodes RMNode node1 = @@ -1570,7 +1329,9 @@ public class TestFairScheduler { } @Test (timeout = 5000) - public void testMultipleContainersWaitingForReservation() { + public void testMultipleContainersWaitingForReservation() throws IOException { + scheduler.reinitialize(conf, resourceManager.getRMContext()); + // Add a node RMNode node1 = MockNodes @@ -1600,9 +1361,7 @@ public class TestFairScheduler { @Test (timeout = 5000) public void testUserMaxRunningApps() throws Exception { // Set max running apps - Configuration conf = createConfiguration(); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - scheduler.reinitialize(conf, resourceManager.getRMContext()); PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println(""); @@ -1613,8 +1372,7 @@ public class TestFairScheduler { out.println(""); out.close(); - QueueManager queueManager = scheduler.getQueueManager(); - queueManager.initialize(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); // Add a node RMNode node1 = @@ -1654,7 +1412,9 @@ public class TestFairScheduler { } @Test (timeout = 5000) - public void testReservationWhileMultiplePriorities() { + public void testReservationWhileMultiplePriorities() throws IOException { + scheduler.reinitialize(conf, resourceManager.getRMContext()); + // Add a node RMNode node1 = MockNodes @@ -1717,9 +1477,7 @@ public class TestFairScheduler { @Test public void testAclSubmitApplication() throws Exception { // Set acl's - Configuration conf = createConfiguration(); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - scheduler.reinitialize(conf, resourceManager.getRMContext()); PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println(""); @@ -1735,8 +1493,7 @@ public class TestFairScheduler { out.println(""); out.close(); - QueueManager queueManager = scheduler.getQueueManager(); - queueManager.initialize(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "norealuserhasthisname", 1); @@ -1751,6 +1508,8 @@ public class TestFairScheduler { @Test (timeout = 5000) public void testMultipleNodesSingleRackRequest() throws Exception { + scheduler.reinitialize(conf, resourceManager.getRMContext()); + RMNode node1 = MockNodes .newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1"); @@ -1797,6 +1556,8 @@ public class TestFairScheduler { @Test (timeout = 5000) public void testFifoWithinQueue() throws Exception { + scheduler.reinitialize(conf, resourceManager.getRMContext()); + RMNode node1 = MockNodes .newNodeInfo(1, Resources.createResource(3072, 3), 1, "127.0.0.1"); @@ -1837,11 +1598,9 @@ public class TestFairScheduler { } @Test(timeout = 3000) - public void testMaxAssign() throws AllocationConfigurationException { - // set required scheduler configs - scheduler.assignMultiple = true; - scheduler.getQueueManager().getLeafQueue("root.default", true) - .setPolicy(SchedulingPolicy.getDefault()); + public void testMaxAssign() throws Exception { + conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true); + scheduler.reinitialize(conf, resourceManager.getRMContext()); RMNode node = MockNodes.newNodeInfo(1, Resources.createResource(16384, 16), 0, @@ -1884,6 +1643,8 @@ public class TestFairScheduler { */ @Test(timeout = 5000) public void testAssignContainer() throws Exception { + scheduler.reinitialize(conf, resourceManager.getRMContext()); + final String user = "user1"; final String fifoQueue = "fifo"; final String fairParent = "fairParent"; @@ -1951,9 +1712,7 @@ public class TestFairScheduler { @Test public void testNotAllowSubmitApplication() throws Exception { // Set acl's - Configuration conf = createConfiguration(); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - scheduler.reinitialize(conf, resourceManager.getRMContext()); PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println(""); out.println(""); @@ -1967,8 +1726,8 @@ public class TestFairScheduler { out.println(""); out.println(""); out.close(); - QueueManager queueManager = scheduler.getQueueManager(); - queueManager.initialize(); + + scheduler.reinitialize(conf, resourceManager.getRMContext()); int appId = this.APP_ID++; String user = "usernotallow"; @@ -2017,7 +1776,9 @@ public class TestFairScheduler { } @Test - public void testReservationThatDoesntFit() { + public void testReservationThatDoesntFit() throws IOException { + scheduler.reinitialize(conf, resourceManager.getRMContext()); + RMNode node1 = MockNodes .newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1"); @@ -2043,7 +1804,9 @@ public class TestFairScheduler { } @Test - public void testRemoveNodeUpdatesRootQueueMetrics() { + public void testRemoveNodeUpdatesRootQueueMetrics() throws IOException { + scheduler.reinitialize(conf, resourceManager.getRMContext()); + assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB()); assertEquals(0, scheduler.getRootQueueMetrics().getAvailableVirtualCores()); @@ -2069,7 +1832,9 @@ public class TestFairScheduler { } @Test - public void testStrictLocality() { + public void testStrictLocality() throws IOException { + scheduler.reinitialize(conf, resourceManager.getRMContext()); + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); scheduler.handle(nodeEvent1); @@ -2107,7 +1872,9 @@ public class TestFairScheduler { } @Test - public void testCancelStrictLocality() { + public void testCancelStrictLocality() throws IOException { + scheduler.reinitialize(conf, resourceManager.getRMContext()); + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); scheduler.handle(nodeEvent1); @@ -2155,7 +1922,9 @@ public class TestFairScheduler { * a reservation on another. */ @Test - public void testReservationsStrictLocality() { + public void testReservationsStrictLocality() throws IOException { + scheduler.reinitialize(conf, resourceManager.getRMContext()); + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1"); RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 2, "127.0.0.2"); NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node1); @@ -2193,7 +1962,9 @@ public class TestFairScheduler { } @Test - public void testNoMoreCpuOnNode() { + public void testNoMoreCpuOnNode() throws IOException { + scheduler.reinitialize(conf, resourceManager.getRMContext()); + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 1), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); @@ -2213,6 +1984,8 @@ public class TestFairScheduler { @Test public void testBasicDRFAssignment() throws Exception { + scheduler.reinitialize(conf, resourceManager.getRMContext()); + RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 5)); NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); scheduler.handle(nodeEvent); @@ -2251,6 +2024,8 @@ public class TestFairScheduler { */ @Test public void testBasicDRFWithQueues() throws Exception { + scheduler.reinitialize(conf, resourceManager.getRMContext()); + RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 7), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); @@ -2285,6 +2060,8 @@ public class TestFairScheduler { @Test public void testDRFHierarchicalQueues() throws Exception { + scheduler.reinitialize(conf, resourceManager.getRMContext()); + RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(12288, 12), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); @@ -2349,9 +2126,9 @@ public class TestFairScheduler { @Test(timeout = 30000) public void testHostPortNodeName() throws Exception { - scheduler.getConf().setBoolean(YarnConfiguration + conf.setBoolean(YarnConfiguration .RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true); - scheduler.reinitialize(scheduler.getConf(), + scheduler.reinitialize(conf, resourceManager.getRMContext()); RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1", 1); @@ -2426,9 +2203,7 @@ public class TestFairScheduler { @Test public void testUserAndQueueMaxRunningApps() throws Exception { - Configuration conf = createConfiguration(); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - scheduler.reinitialize(conf, resourceManager.getRMContext()); PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println(""); @@ -2442,8 +2217,7 @@ public class TestFairScheduler { out.println(""); out.close(); - QueueManager queueManager = scheduler.getQueueManager(); - queueManager.initialize(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); // exceeds no limits ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1"); @@ -2479,9 +2253,7 @@ public class TestFairScheduler { @Test public void testMaxRunningAppsHierarchicalQueues() throws Exception { - Configuration conf = createConfiguration(); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - scheduler.reinitialize(conf, resourceManager.getRMContext()); MockClock clock = new MockClock(); scheduler.setClock(clock); @@ -2499,8 +2271,7 @@ public class TestFairScheduler { out.println(""); out.close(); - QueueManager queueManager = scheduler.getQueueManager(); - queueManager.initialize(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); // exceeds no limits ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1.sub1", "user1"); @@ -2629,10 +2400,8 @@ public class TestFairScheduler { @Test public void testDontAllowUndeclaredPools() throws Exception{ - Configuration conf = createConfiguration(); conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - scheduler.reinitialize(conf, resourceManager.getRMContext()); PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println(""); @@ -2642,8 +2411,8 @@ public class TestFairScheduler { out.println(""); out.close(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); QueueManager queueManager = scheduler.getQueueManager(); - queueManager.initialize(); FSLeafQueue jerryQueue = queueManager.getLeafQueue("jerry", false); FSLeafQueue defaultQueue = queueManager.getLeafQueue("default", false); @@ -2672,6 +2441,8 @@ public class TestFairScheduler { @SuppressWarnings("resource") @Test public void testBlacklistNodes() throws Exception { + scheduler.reinitialize(conf, resourceManager.getRMContext()); + final int GB = 1024; String host = "127.0.0.1"; RMNode node = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerConfiguration.java index da8a183c10a..85e1cb97144 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerConfiguration.java @@ -61,14 +61,4 @@ public class TestFairSchedulerConfiguration { parseResourceConfigValue("1o24vc0res"); } - @Test - public void testGetAllocationFileFromClasspath() { - FairSchedulerConfiguration conf = new FairSchedulerConfiguration( - new Configuration()); - conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, - "test-fair-scheduler.xml"); - File allocationFile = conf.getAllocationFile(); - Assert.assertEquals("test-fair-scheduler.xml", allocationFile.getName()); - Assert.assertTrue(allocationFile.exists()); - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java index 20f6e3d7757..e70c039dcaa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java @@ -39,18 +39,21 @@ public class TestMaxRunningAppsEnforcer { @Before public void setup() throws Exception { + Configuration conf = new Configuration(); clock = new TestFairScheduler.MockClock(); FairScheduler scheduler = mock(FairScheduler.class); when(scheduler.getConf()).thenReturn( - new FairSchedulerConfiguration(new Configuration())); + new FairSchedulerConfiguration(conf)); when(scheduler.getClock()).thenReturn(clock); + AllocationConfiguration allocConf = new AllocationConfiguration( + conf); + when(scheduler.getAllocationConfiguration()).thenReturn(allocConf); queueManager = new QueueManager(scheduler); - queueManager.initialize(); - - queueMaxApps = queueManager.info.queueMaxApps; - userMaxApps = queueManager.info.userMaxApps; - maxAppsEnforcer = new MaxRunningAppsEnforcer(queueManager); + queueManager.initialize(conf); + queueMaxApps = allocConf.queueMaxApps; + userMaxApps = allocConf.userMaxApps; + maxAppsEnforcer = new MaxRunningAppsEnforcer(scheduler); appNum = 0; }