YARN-2738. [YARN-2574] Add FairReservationSystem for FairScheduler. (Anubhav Dhoot via kasha)

This commit is contained in:
Karthik Kambatla 2014-12-19 14:23:43 -08:00
parent 6f1e3667cf
commit a22ffc3188
11 changed files with 622 additions and 13 deletions

View File

@ -67,6 +67,9 @@ Release 2.7.0 - UNRELEASED
YARN-2203. [YARN-1492] Web UI for cache manager. (Chris Trezzo via kasha) YARN-2203. [YARN-1492] Web UI for cache manager. (Chris Trezzo via kasha)
YARN-2738. [YARN-2574] Add FairReservationSystem for FairScheduler.
(Anubhav Dhoot via kasha)
IMPROVEMENTS IMPROVEMENTS
YARN-2950. Change message to mandate, not suggest JS requirement on UI. YARN-2950. Change message to mandate, not suggest JS requirement on UI.

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.UTCClock; import org.apache.hadoop.yarn.util.UTCClock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@ -322,9 +323,10 @@ public abstract class AbstractReservationSystem extends AbstractService
* @param scheduler the scheduler for which the reservation system is required * @param scheduler the scheduler for which the reservation system is required
*/ */
public static String getDefaultReservationSystem(ResourceScheduler scheduler) { public static String getDefaultReservationSystem(ResourceScheduler scheduler) {
// currently only capacity scheduler is supported
if (scheduler instanceof CapacityScheduler) { if (scheduler instanceof CapacityScheduler) {
return CapacityReservationSystem.class.getName(); return CapacityReservationSystem.class.getName();
} else if (scheduler instanceof FairScheduler) {
return FairReservationSystem.class.getName();
} }
return null; return null;
} }

View File

@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
public class FairReservationSystem extends AbstractReservationSystem {
private FairScheduler fairScheduler;
public FairReservationSystem() {
super(FairReservationSystem.class.getName());
}
@Override
public void reinitialize(Configuration conf, RMContext rmContext)
throws YarnException {
// Validate if the scheduler is fair scheduler
ResourceScheduler scheduler = rmContext.getScheduler();
if (!(scheduler instanceof FairScheduler)) {
throw new YarnRuntimeException("Class "
+ scheduler.getClass().getCanonicalName() + " not instance of "
+ FairScheduler.class.getCanonicalName());
}
fairScheduler = (FairScheduler) scheduler;
this.conf = conf;
super.reinitialize(conf, rmContext);
}
@Override
protected ReservationSchedulerConfiguration
getReservationSchedulerConfiguration() {
return fairScheduler.getAllocationConfiguration();
}
@Override
protected ResourceCalculator getResourceCalculator() {
return fairScheduler.getResourceCalculator();
}
@Override
protected QueueMetrics getRootQueueMetrics() {
return fairScheduler.getRootQueueMetrics();
}
@Override
protected Resource getMinAllocation() {
return fairScheduler.getMinimumResourceCapability();
}
@Override
protected Resource getMaxAllocation() {
return fairScheduler.getMaximumResourceCapability();
}
@Override
protected String getPlanQueuePath(String planQueueName) {
return planQueueName; }
@Override
protected Resource getPlanQueueCapacity(String planQueueName) {
return fairScheduler.getQueueManager().getParentQueue(planQueueName, false)
.getSteadyFairShare();
}
}

View File

@ -27,12 +27,13 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
public class AllocationConfiguration { public class AllocationConfiguration extends ReservationSchedulerConfiguration {
private static final AccessControlList EVERYBODY_ACL = new AccessControlList("*"); private static final AccessControlList EVERYBODY_ACL = new AccessControlList("*");
private static final AccessControlList NOBODY_ACL = new AccessControlList(" "); private static final AccessControlList NOBODY_ACL = new AccessControlList(" ");
@ -76,6 +77,8 @@ public class AllocationConfiguration {
// preempt other queues' tasks. // preempt other queues' tasks.
private final Map<String, Float> fairSharePreemptionThresholds; private final Map<String, Float> fairSharePreemptionThresholds;
private final Set<String> reservableQueues;
private final Map<String, SchedulingPolicy> schedulingPolicies; private final Map<String, SchedulingPolicy> schedulingPolicies;
private final SchedulingPolicy defaultSchedulingPolicy; private final SchedulingPolicy defaultSchedulingPolicy;
@ -87,7 +90,10 @@ public class AllocationConfiguration {
//Configured queues in the alloc xml //Configured queues in the alloc xml
@VisibleForTesting @VisibleForTesting
Map<FSQueueType, Set<String>> configuredQueues; Map<FSQueueType, Set<String>> configuredQueues;
// Reservation system configuration
private ReservationQueueConfiguration globalReservationQueueConfig;
public AllocationConfiguration(Map<String, Resource> minQueueResources, public AllocationConfiguration(Map<String, Resource> minQueueResources,
Map<String, Resource> maxQueueResources, Map<String, Resource> maxQueueResources,
Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps, Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
@ -101,7 +107,9 @@ public class AllocationConfiguration {
Map<String, Float> fairSharePreemptionThresholds, Map<String, Float> fairSharePreemptionThresholds,
Map<String, Map<QueueACL, AccessControlList>> queueAcls, Map<String, Map<QueueACL, AccessControlList>> queueAcls,
QueuePlacementPolicy placementPolicy, QueuePlacementPolicy placementPolicy,
Map<FSQueueType, Set<String>> configuredQueues) { Map<FSQueueType, Set<String>> configuredQueues,
ReservationQueueConfiguration globalReservationQueueConfig,
Set<String> reservableQueues) {
this.minQueueResources = minQueueResources; this.minQueueResources = minQueueResources;
this.maxQueueResources = maxQueueResources; this.maxQueueResources = maxQueueResources;
this.queueMaxApps = queueMaxApps; this.queueMaxApps = queueMaxApps;
@ -117,6 +125,8 @@ public class AllocationConfiguration {
this.fairSharePreemptionTimeouts = fairSharePreemptionTimeouts; this.fairSharePreemptionTimeouts = fairSharePreemptionTimeouts;
this.fairSharePreemptionThresholds = fairSharePreemptionThresholds; this.fairSharePreemptionThresholds = fairSharePreemptionThresholds;
this.queueAcls = queueAcls; this.queueAcls = queueAcls;
this.reservableQueues = reservableQueues;
this.globalReservationQueueConfig = globalReservationQueueConfig;
this.placementPolicy = placementPolicy; this.placementPolicy = placementPolicy;
this.configuredQueues = configuredQueues; this.configuredQueues = configuredQueues;
} }
@ -137,6 +147,7 @@ public class AllocationConfiguration {
fairSharePreemptionThresholds = new HashMap<String, Float>(); fairSharePreemptionThresholds = new HashMap<String, Float>();
schedulingPolicies = new HashMap<String, SchedulingPolicy>(); schedulingPolicies = new HashMap<String, SchedulingPolicy>();
defaultSchedulingPolicy = SchedulingPolicy.DEFAULT_POLICY; defaultSchedulingPolicy = SchedulingPolicy.DEFAULT_POLICY;
reservableQueues = new HashSet<>();
configuredQueues = new HashMap<FSQueueType, Set<String>>(); configuredQueues = new HashMap<FSQueueType, Set<String>>();
for (FSQueueType queueType : FSQueueType.values()) { for (FSQueueType queueType : FSQueueType.values()) {
configuredQueues.put(queueType, new HashSet<String>()); configuredQueues.put(queueType, new HashSet<String>());
@ -262,4 +273,54 @@ public class AllocationConfiguration {
public QueuePlacementPolicy getPlacementPolicy() { public QueuePlacementPolicy getPlacementPolicy() {
return placementPolicy; return placementPolicy;
} }
@Override
public boolean isReservable(String queue) {
return reservableQueues.contains(queue);
}
@Override
public long getReservationWindow(String queue) {
return globalReservationQueueConfig.getReservationWindowMsec();
}
@Override
public float getAverageCapacity(String queue) {
return globalReservationQueueConfig.getAvgOverTimeMultiplier() * 100;
}
@Override
public float getInstantaneousMaxCapacity(String queue) {
return globalReservationQueueConfig.getMaxOverTimeMultiplier() * 100;
}
@Override
public String getReservationAdmissionPolicy(String queue) {
return globalReservationQueueConfig.getReservationAdmissionPolicy();
}
@Override
public String getReservationAgent(String queue) {
return globalReservationQueueConfig.getReservationAgent();
}
@Override
public boolean getShowReservationAsQueues(String queue) {
return globalReservationQueueConfig.shouldShowReservationAsQueues();
}
@Override
public String getReplanner(String queue) {
return globalReservationQueueConfig.getPlanner();
}
@Override
public boolean getMoveOnExpiry(String queue) {
return globalReservationQueueConfig.shouldMoveOnExpiry();
}
@Override
public long getEnforcementWindow(String queue) {
return globalReservationQueueConfig.getEnforcementWindowMsec();
}
} }

View File

@ -86,7 +86,7 @@ public class AllocationFileLoaderService extends AbstractService {
private Thread reloadThread; private Thread reloadThread;
private volatile boolean running = true; private volatile boolean running = true;
public AllocationFileLoaderService() { public AllocationFileLoaderService() {
this(new SystemClock()); this(new SystemClock());
} }
@ -222,6 +222,7 @@ public class AllocationFileLoaderService extends AbstractService {
new HashMap<String, Float>(); new HashMap<String, Float>();
Map<String, Map<QueueACL, AccessControlList>> queueAcls = Map<String, Map<QueueACL, AccessControlList>> queueAcls =
new HashMap<String, Map<QueueACL, AccessControlList>>(); new HashMap<String, Map<QueueACL, AccessControlList>>();
Set<String> reservableQueues = new HashSet<String>();
int userMaxAppsDefault = Integer.MAX_VALUE; int userMaxAppsDefault = Integer.MAX_VALUE;
int queueMaxAppsDefault = Integer.MAX_VALUE; int queueMaxAppsDefault = Integer.MAX_VALUE;
float queueMaxAMShareDefault = 0.5f; float queueMaxAMShareDefault = 0.5f;
@ -230,6 +231,11 @@ public class AllocationFileLoaderService extends AbstractService {
float defaultFairSharePreemptionThreshold = 0.5f; float defaultFairSharePreemptionThreshold = 0.5f;
SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY; SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY;
// Reservation global configuration knobs
String planner = null;
String reservationAgent = null;
String reservationAdmissionPolicy = null;
QueuePlacementPolicy newPlacementPolicy = null; QueuePlacementPolicy newPlacementPolicy = null;
// Remember all queue names so we can display them on web UI, etc. // Remember all queue names so we can display them on web UI, etc.
@ -317,6 +323,15 @@ public class AllocationFileLoaderService extends AbstractService {
defaultSchedPolicy = SchedulingPolicy.parse(text); defaultSchedPolicy = SchedulingPolicy.parse(text);
} else if ("queuePlacementPolicy".equals(element.getTagName())) { } else if ("queuePlacementPolicy".equals(element.getTagName())) {
placementPolicyElement = element; placementPolicyElement = element;
} else if ("reservation-planner".equals(element.getTagName())) {
String text = ((Text) element.getFirstChild()).getData().trim();
planner = text;
} else if ("reservation-agent".equals(element.getTagName())) {
String text = ((Text) element.getFirstChild()).getData().trim();
reservationAgent = text;
} else if ("reservation-policy".equals(element.getTagName())) {
String text = ((Text) element.getFirstChild()).getData().trim();
reservationAdmissionPolicy = text;
} else { } else {
LOG.warn("Bad element in allocations file: " + element.getTagName()); LOG.warn("Bad element in allocations file: " + element.getTagName());
} }
@ -337,7 +352,8 @@ public class AllocationFileLoaderService extends AbstractService {
loadQueue(parent, element, minQueueResources, maxQueueResources, loadQueue(parent, element, minQueueResources, maxQueueResources,
queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts,
fairSharePreemptionThresholds, queueAcls, configuredQueues); fairSharePreemptionThresholds, queueAcls, configuredQueues,
reservableQueues);
} }
// Load placement policy and pass it configured queues // Load placement policy and pass it configured queues
@ -366,13 +382,27 @@ public class AllocationFileLoaderService extends AbstractService {
defaultFairSharePreemptionThreshold); defaultFairSharePreemptionThreshold);
} }
ReservationQueueConfiguration globalReservationQueueConfig = new
ReservationQueueConfiguration();
if (planner != null) {
globalReservationQueueConfig.setPlanner(planner);
}
if (reservationAdmissionPolicy != null) {
globalReservationQueueConfig.setReservationAdmissionPolicy
(reservationAdmissionPolicy);
}
if (reservationAgent != null) {
globalReservationQueueConfig.setReservationAgent(reservationAgent);
}
AllocationConfiguration info = new AllocationConfiguration(minQueueResources, AllocationConfiguration info = new AllocationConfiguration(minQueueResources,
maxQueueResources, queueMaxApps, userMaxApps, queueWeights, maxQueueResources, queueMaxApps, userMaxApps, queueWeights,
queueMaxAMShares, userMaxAppsDefault, queueMaxAppsDefault, queueMaxAMShares, userMaxAppsDefault, queueMaxAppsDefault,
queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy, queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy,
minSharePreemptionTimeouts, fairSharePreemptionTimeouts, minSharePreemptionTimeouts, fairSharePreemptionTimeouts,
fairSharePreemptionThresholds, queueAcls, fairSharePreemptionThresholds, queueAcls,
newPlacementPolicy, configuredQueues); newPlacementPolicy, configuredQueues, globalReservationQueueConfig,
reservableQueues);
lastSuccessfulReload = clock.getTime(); lastSuccessfulReload = clock.getTime();
lastReloadAttemptFailed = false; lastReloadAttemptFailed = false;
@ -392,8 +422,9 @@ public class AllocationFileLoaderService extends AbstractService {
Map<String, Long> minSharePreemptionTimeouts, Map<String, Long> minSharePreemptionTimeouts,
Map<String, Long> fairSharePreemptionTimeouts, Map<String, Long> fairSharePreemptionTimeouts,
Map<String, Float> fairSharePreemptionThresholds, Map<String, Float> fairSharePreemptionThresholds,
Map<String, Map<QueueACL, AccessControlList>> queueAcls, Map<String, Map<QueueACL, AccessControlList>> queueAcls,
Map<FSQueueType, Set<String>> configuredQueues) Map<FSQueueType, Set<String>> configuredQueues,
Set<String> reservableQueues)
throws AllocationConfigurationException { throws AllocationConfigurationException {
String queueName = element.getAttribute("name"); String queueName = element.getAttribute("name");
@ -460,14 +491,17 @@ public class AllocationFileLoaderService extends AbstractService {
} else if ("aclAdministerApps".equals(field.getTagName())) { } else if ("aclAdministerApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData(); String text = ((Text)field.getFirstChild()).getData();
acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text)); acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
} else if ("reservation".equals(field.getTagName())) {
isLeaf = false;
reservableQueues.add(queueName);
configuredQueues.get(FSQueueType.PARENT).add(queueName);
} else if ("queue".endsWith(field.getTagName()) || } else if ("queue".endsWith(field.getTagName()) ||
"pool".equals(field.getTagName())) { "pool".equals(field.getTagName())) {
loadQueue(queueName, field, minQueueResources, maxQueueResources, loadQueue(queueName, field, minQueueResources, maxQueueResources,
queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
queuePolicies, minSharePreemptionTimeouts, queuePolicies, minSharePreemptionTimeouts,
fairSharePreemptionTimeouts, fairSharePreemptionThresholds, fairSharePreemptionTimeouts, fairSharePreemptionThresholds,
queueAcls, configuredQueues); queueAcls, configuredQueues, reservableQueues);
configuredQueues.get(FSQueueType.PARENT).add(queueName);
isLeaf = false; isLeaf = false;
} }
} }
@ -479,6 +513,13 @@ public class AllocationFileLoaderService extends AbstractService {
} else { } else {
configuredQueues.get(FSQueueType.LEAF).add(queueName); configuredQueues.get(FSQueueType.LEAF).add(queueName);
} }
} else {
if ("parent".equals(element.getAttribute("type"))) {
throw new AllocationConfigurationException("Both <reservation> and " +
"type=\"parent\" found for queue " + queueName + " which is " +
"unsupported");
}
configuredQueues.get(FSQueueType.PARENT).add(queueName);
} }
queueAcls.put(queueName, acls); queueAcls.put(queueName, acls);
if (maxQueueResources.containsKey(queueName) && if (maxQueueResources.containsKey(queueName) &&

View File

@ -1568,4 +1568,16 @@ public class FairScheduler extends
return EnumSet return EnumSet
.of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU); .of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU);
} }
@Override
public Set<String> getPlanQueues() throws YarnException {
Set<String> planQueues = new HashSet<String>();
for (FSQueue fsQueue : queueMgr.getQueues()) {
String queueName = fsQueue.getName();
if (allocConf.isReservable(queueName)) {
planQueues.add(queueName);
}
}
return planQueues;
}
} }

View File

@ -0,0 +1,105 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
@InterfaceAudience.Public
@InterfaceStability.Unstable
public class ReservationQueueConfiguration {
private long reservationWindow;
private long enforcementWindow;
private String reservationAdmissionPolicy;
private String reservationAgent;
private String planner;
private boolean showReservationAsQueues;
private boolean moveOnExpiry;
private float avgOverTimeMultiplier;
private float maxOverTimeMultiplier;
public ReservationQueueConfiguration() {
this.reservationWindow = ReservationSchedulerConfiguration
.DEFAULT_RESERVATION_WINDOW;
this.enforcementWindow = ReservationSchedulerConfiguration
.DEFAULT_RESERVATION_ENFORCEMENT_WINDOW;
this.reservationAdmissionPolicy = ReservationSchedulerConfiguration
.DEFAULT_RESERVATION_ADMISSION_POLICY;
this.reservationAgent = ReservationSchedulerConfiguration
.DEFAULT_RESERVATION_AGENT_NAME;
this.planner = ReservationSchedulerConfiguration
.DEFAULT_RESERVATION_PLANNER_NAME;
this.showReservationAsQueues = ReservationSchedulerConfiguration
.DEFAULT_SHOW_RESERVATIONS_AS_QUEUES;
this.moveOnExpiry = ReservationSchedulerConfiguration
.DEFAULT_RESERVATION_MOVE_ON_EXPIRY;
this.avgOverTimeMultiplier = ReservationSchedulerConfiguration
.DEFAULT_CAPACITY_OVER_TIME_MULTIPLIER;
this.maxOverTimeMultiplier = ReservationSchedulerConfiguration
.DEFAULT_CAPACITY_OVER_TIME_MULTIPLIER;
}
public long getReservationWindowMsec() {
return reservationWindow;
}
public long getEnforcementWindowMsec() {
return enforcementWindow;
}
public boolean shouldShowReservationAsQueues() {
return showReservationAsQueues;
}
public boolean shouldMoveOnExpiry() {
return moveOnExpiry;
}
public String getReservationAdmissionPolicy() {
return reservationAdmissionPolicy;
}
public String getReservationAgent() {
return reservationAgent;
}
public String getPlanner() {
return planner;
}
public float getAvgOverTimeMultiplier() {
return avgOverTimeMultiplier;
}
public float getMaxOverTimeMultiplier() {
return maxOverTimeMultiplier;
}
public void setPlanner(String planner) {
this.planner = planner;
}
public void setReservationAdmissionPolicy(String reservationAdmissionPolicy) {
this.reservationAdmissionPolicy = reservationAdmissionPolicy;
}
public void setReservationAgent(String reservationAgent) {
this.reservationAgent = reservationAgent;
}
}

View File

@ -94,9 +94,14 @@ public class FairSchedulerQueueInfo {
fractionMemMaxShare = (float)maxResources.getMemory() / clusterResources.getMemory(); fractionMemMaxShare = (float)maxResources.getMemory() / clusterResources.getMemory();
maxApps = allocConf.getQueueMaxApps(queueName); maxApps = allocConf.getQueueMaxApps(queueName);
Collection<FSQueue> children = queue.getChildQueues();
childQueues = new ArrayList<FairSchedulerQueueInfo>(); childQueues = new ArrayList<FairSchedulerQueueInfo>();
if (allocConf.isReservable(queueName) &&
!allocConf.getShowReservationAsQueues(queueName)) {
return;
}
Collection<FSQueue> children = queue.getChildQueues();
for (FSQueue child : children) { for (FSQueue child : children) {
if (child instanceof FSLeafQueue) { if (child instanceof FSLeafQueue) {
childQueues.add(new FairSchedulerLeafQueueInfo((FSLeafQueue)child, scheduler)); childQueues.add(new FairSchedulerLeafQueueInfo((FSLeafQueue)child, scheduler));

View File

@ -23,7 +23,9 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
@ -102,6 +104,62 @@ public class ReservationSystemTestUtil {
.assertTrue(newPlan.getSharingPolicy() instanceof CapacityOverTimePolicy); .assertTrue(newPlan.getSharingPolicy() instanceof CapacityOverTimePolicy);
} }
static void setupFSAllocationFile(String allocationFile)
throws IOException {
PrintWriter out = new PrintWriter(new FileWriter(allocationFile));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"default\">");
out.println("<weight>1</weight>");
out.println("</queue>");
out.println("<queue name=\"a\">");
out.println("<weight>1</weight>");
out.println("<queue name=\"a1\">");
out.println("<weight>3</weight>");
out.println("</queue>");
out.println("<queue name=\"a2\">");
out.println("<weight>7</weight>");
out.println("</queue>");
out.println("</queue>");
out.println("<queue name=\"dedicated\">");
out.println("<reservation></reservation>");
out.println("<weight>8</weight>");
out.println("</queue>");
out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
out.println("</allocations>");
out.close();
}
static void updateFSAllocationFile(String allocationFile)
throws IOException {
PrintWriter out = new PrintWriter(new FileWriter(allocationFile));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"default\">");
out.println("<weight>5</weight>");
out.println("</queue>");
out.println("<queue name=\"a\">");
out.println("<weight>5</weight>");
out.println("<queue name=\"a1\">");
out.println("<weight>3</weight>");
out.println("</queue>");
out.println("<queue name=\"a2\">");
out.println("<weight>7</weight>");
out.println("</queue>");
out.println("</queue>");
out.println("<queue name=\"dedicated\">");
out.println("<reservation></reservation>");
out.println("<weight>80</weight>");
out.println("</queue>");
out.println("<queue name=\"reservation\">");
out.println("<reservation></reservation>");
out.println("<weight>10</weight>");
out.println("</queue>");
out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
out.println("</allocations>");
out.close();
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public CapacityScheduler mockCapacityScheduler(int numContainers) public CapacityScheduler mockCapacityScheduler(int numContainers)
throws IOException { throws IOException {

View File

@ -0,0 +1,151 @@
/*******************************************************************************
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*******************************************************************************/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerTestBase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import static org.mockito.Mockito.when;
public class TestFairReservationSystem extends FairSchedulerTestBase {
private final static String ALLOC_FILE = new File(TEST_DIR,
TestFairReservationSystem.class.getName() + ".xml").getAbsolutePath();
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
ResourceScheduler.class);
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
return conf;
}
@Before
public void setup() throws IOException {
conf = createConfiguration();
}
@After
public void teardown() {
if (resourceManager != null) {
resourceManager.stop();
resourceManager = null;
}
conf = null;
}
@Test
public void testFairReservationSystemInitialize() throws IOException {
ReservationSystemTestUtil.setupFSAllocationFile(ALLOC_FILE);
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
// Setup
RMContext mockRMContext = testUtil.createRMContext(conf);
setupFairScheduler(testUtil, mockRMContext);
FairReservationSystem reservationSystem = new FairReservationSystem();
reservationSystem.setRMContext(mockRMContext);
try {
reservationSystem.reinitialize(scheduler.getConf(), mockRMContext);
} catch (YarnException e) {
Assert.fail(e.getMessage());
}
ReservationSystemTestUtil.validateReservationQueue(reservationSystem,
testUtil.getFullReservationQueueName());
}
@Test
public void testFairReservationSystemReinitialize() throws IOException {
ReservationSystemTestUtil.setupFSAllocationFile(ALLOC_FILE);
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
// Setup
RMContext mockContext = testUtil.createRMContext(conf);
setupFairScheduler(testUtil, mockContext);
FairReservationSystem reservationSystem = new FairReservationSystem();
reservationSystem.setRMContext(mockContext);
try {
reservationSystem.reinitialize(scheduler.getConf(), mockContext);
} catch (YarnException e) {
Assert.fail(e.getMessage());
}
// Assert queue in original config
final String planQNam = testUtil.getFullReservationQueueName();
ReservationSystemTestUtil.validateReservationQueue(reservationSystem,
planQNam);
// Dynamically add a plan
ReservationSystemTestUtil.updateFSAllocationFile(ALLOC_FILE);
scheduler.reinitialize(conf, mockContext);
try {
reservationSystem.reinitialize(conf, mockContext);
} catch (YarnException e) {
Assert.fail(e.getMessage());
}
String newQueue = "root.reservation";
ReservationSystemTestUtil.validateNewReservationQueue
(reservationSystem, newQueue);
}
private void setupFairScheduler(ReservationSystemTestUtil testUtil,
RMContext rmContext) throws
IOException {
scheduler = new FairScheduler();
scheduler.setRMContext(rmContext);
int numContainers = 10;
when(rmContext.getScheduler()).thenReturn(scheduler);
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, rmContext);
Resource resource = testUtil.calculateClusterResource(numContainers);
RMNode node1 = MockNodes.newNodeInfo(1, resource, 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
}
}

View File

@ -27,6 +27,7 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.NestedUserQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.NestedUserQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
@ -550,6 +551,86 @@ public class TestAllocationFileLoaderService {
allocLoader.reloadAllocations(); allocLoader.reloadAllocations();
} }
@Test
public void testReservableQueue() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"reservable\">");
out.println("<reservation>");
out.println("</reservation>");
out.println("</queue>");
out.println("<queue name=\"other\">");
out.println("</queue>");
out.println("<reservation-agent>DummyAgentName</reservation-agent>");
out.println("<reservation-policy>AnyAdmissionPolicy</reservation-policy>");
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
allocLoader.reloadAllocations();
AllocationConfiguration allocConf = confHolder.allocConf;
String reservableQueueName = "root.reservable";
String nonreservableQueueName = "root.other";
assertFalse(allocConf.isReservable(nonreservableQueueName));
assertTrue(allocConf.isReservable(reservableQueueName));
assertTrue(allocConf.getMoveOnExpiry(reservableQueueName));
assertEquals(ReservationSchedulerConfiguration.DEFAULT_RESERVATION_WINDOW,
allocConf.getReservationWindow(reservableQueueName));
assertEquals(100, allocConf.getInstantaneousMaxCapacity
(reservableQueueName),
0.0001);
assertEquals(
"DummyAgentName",
allocConf.getReservationAgent(reservableQueueName));
assertEquals(100, allocConf.getAverageCapacity(reservableQueueName), 0.001);
assertFalse(allocConf.getShowReservationAsQueues(reservableQueueName));
assertEquals("AnyAdmissionPolicy",
allocConf.getReservationAdmissionPolicy(reservableQueueName));
assertEquals(ReservationSchedulerConfiguration
.DEFAULT_RESERVATION_PLANNER_NAME,
allocConf.getReplanner(reservableQueueName));
assertEquals(ReservationSchedulerConfiguration
.DEFAULT_RESERVATION_ENFORCEMENT_WINDOW,
allocConf.getEnforcementWindow(reservableQueueName));
}
/**
* Verify that you can't have dynamic user queue and reservable queue on
* the same queue
*/
@Test (expected = AllocationConfigurationException.class)
public void testReservableCannotBeCombinedWithDynamicUserQueue()
throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"notboth\" type=\"parent\" >");
out.println("<reservation>");
out.println("</reservation>");
out.println("</queue>");
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
allocLoader.reloadAllocations();
}
private class ReloadListener implements AllocationFileLoaderService.Listener { private class ReloadListener implements AllocationFileLoaderService.Listener {
public AllocationConfiguration allocConf; public AllocationConfiguration allocConf;