YARN-2738. [YARN-2574] Add FairReservationSystem for FairScheduler. (Anubhav Dhoot via kasha)
(cherry picked from commit a22ffc3188
)
This commit is contained in:
parent
e6c73a1c1a
commit
8ee40a1580
|
@ -35,6 +35,9 @@ Release 2.7.0 - UNRELEASED
|
|||
|
||||
YARN-2203. [YARN-1492] Web UI for cache manager. (Chris Trezzo via kasha)
|
||||
|
||||
YARN-2738. [YARN-2574] Add FairReservationSystem for FairScheduler.
|
||||
(Anubhav Dhoot via kasha)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-2950. Change message to mandate, not suggest JS requirement on UI.
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.UTCClock;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
|
@ -322,9 +323,10 @@ public abstract class AbstractReservationSystem extends AbstractService
|
|||
* @param scheduler the scheduler for which the reservation system is required
|
||||
*/
|
||||
public static String getDefaultReservationSystem(ResourceScheduler scheduler) {
|
||||
// currently only capacity scheduler is supported
|
||||
if (scheduler instanceof CapacityScheduler) {
|
||||
return CapacityReservationSystem.class.getName();
|
||||
} else if (scheduler instanceof FairScheduler) {
|
||||
return FairReservationSystem.class.getName();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,90 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
|
||||
public class FairReservationSystem extends AbstractReservationSystem {
|
||||
|
||||
private FairScheduler fairScheduler;
|
||||
|
||||
public FairReservationSystem() {
|
||||
super(FairReservationSystem.class.getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reinitialize(Configuration conf, RMContext rmContext)
|
||||
throws YarnException {
|
||||
// Validate if the scheduler is fair scheduler
|
||||
ResourceScheduler scheduler = rmContext.getScheduler();
|
||||
if (!(scheduler instanceof FairScheduler)) {
|
||||
throw new YarnRuntimeException("Class "
|
||||
+ scheduler.getClass().getCanonicalName() + " not instance of "
|
||||
+ FairScheduler.class.getCanonicalName());
|
||||
}
|
||||
fairScheduler = (FairScheduler) scheduler;
|
||||
this.conf = conf;
|
||||
super.reinitialize(conf, rmContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ReservationSchedulerConfiguration
|
||||
getReservationSchedulerConfiguration() {
|
||||
return fairScheduler.getAllocationConfiguration();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ResourceCalculator getResourceCalculator() {
|
||||
return fairScheduler.getResourceCalculator();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected QueueMetrics getRootQueueMetrics() {
|
||||
return fairScheduler.getRootQueueMetrics();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Resource getMinAllocation() {
|
||||
return fairScheduler.getMinimumResourceCapability();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Resource getMaxAllocation() {
|
||||
return fairScheduler.getMaximumResourceCapability();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getPlanQueuePath(String planQueueName) {
|
||||
return planQueueName; }
|
||||
|
||||
@Override
|
||||
protected Resource getPlanQueueCapacity(String planQueueName) {
|
||||
return fairScheduler.getQueueManager().getParentQueue(planQueueName, false)
|
||||
.getSteadyFairShare();
|
||||
}
|
||||
|
||||
}
|
|
@ -27,12 +27,13 @@ import org.apache.hadoop.security.UserGroupInformation;
|
|||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
public class AllocationConfiguration {
|
||||
public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
||||
private static final AccessControlList EVERYBODY_ACL = new AccessControlList("*");
|
||||
private static final AccessControlList NOBODY_ACL = new AccessControlList(" ");
|
||||
|
||||
|
@ -76,6 +77,8 @@ public class AllocationConfiguration {
|
|||
// preempt other queues' tasks.
|
||||
private final Map<String, Float> fairSharePreemptionThresholds;
|
||||
|
||||
private final Set<String> reservableQueues;
|
||||
|
||||
private final Map<String, SchedulingPolicy> schedulingPolicies;
|
||||
|
||||
private final SchedulingPolicy defaultSchedulingPolicy;
|
||||
|
@ -87,7 +90,10 @@ public class AllocationConfiguration {
|
|||
//Configured queues in the alloc xml
|
||||
@VisibleForTesting
|
||||
Map<FSQueueType, Set<String>> configuredQueues;
|
||||
|
||||
|
||||
// Reservation system configuration
|
||||
private ReservationQueueConfiguration globalReservationQueueConfig;
|
||||
|
||||
public AllocationConfiguration(Map<String, Resource> minQueueResources,
|
||||
Map<String, Resource> maxQueueResources,
|
||||
Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
|
||||
|
@ -101,7 +107,9 @@ public class AllocationConfiguration {
|
|||
Map<String, Float> fairSharePreemptionThresholds,
|
||||
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
|
||||
QueuePlacementPolicy placementPolicy,
|
||||
Map<FSQueueType, Set<String>> configuredQueues) {
|
||||
Map<FSQueueType, Set<String>> configuredQueues,
|
||||
ReservationQueueConfiguration globalReservationQueueConfig,
|
||||
Set<String> reservableQueues) {
|
||||
this.minQueueResources = minQueueResources;
|
||||
this.maxQueueResources = maxQueueResources;
|
||||
this.queueMaxApps = queueMaxApps;
|
||||
|
@ -117,6 +125,8 @@ public class AllocationConfiguration {
|
|||
this.fairSharePreemptionTimeouts = fairSharePreemptionTimeouts;
|
||||
this.fairSharePreemptionThresholds = fairSharePreemptionThresholds;
|
||||
this.queueAcls = queueAcls;
|
||||
this.reservableQueues = reservableQueues;
|
||||
this.globalReservationQueueConfig = globalReservationQueueConfig;
|
||||
this.placementPolicy = placementPolicy;
|
||||
this.configuredQueues = configuredQueues;
|
||||
}
|
||||
|
@ -137,6 +147,7 @@ public class AllocationConfiguration {
|
|||
fairSharePreemptionThresholds = new HashMap<String, Float>();
|
||||
schedulingPolicies = new HashMap<String, SchedulingPolicy>();
|
||||
defaultSchedulingPolicy = SchedulingPolicy.DEFAULT_POLICY;
|
||||
reservableQueues = new HashSet<>();
|
||||
configuredQueues = new HashMap<FSQueueType, Set<String>>();
|
||||
for (FSQueueType queueType : FSQueueType.values()) {
|
||||
configuredQueues.put(queueType, new HashSet<String>());
|
||||
|
@ -262,4 +273,54 @@ public class AllocationConfiguration {
|
|||
public QueuePlacementPolicy getPlacementPolicy() {
|
||||
return placementPolicy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isReservable(String queue) {
|
||||
return reservableQueues.contains(queue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getReservationWindow(String queue) {
|
||||
return globalReservationQueueConfig.getReservationWindowMsec();
|
||||
}
|
||||
|
||||
@Override
|
||||
public float getAverageCapacity(String queue) {
|
||||
return globalReservationQueueConfig.getAvgOverTimeMultiplier() * 100;
|
||||
}
|
||||
|
||||
@Override
|
||||
public float getInstantaneousMaxCapacity(String queue) {
|
||||
return globalReservationQueueConfig.getMaxOverTimeMultiplier() * 100;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getReservationAdmissionPolicy(String queue) {
|
||||
return globalReservationQueueConfig.getReservationAdmissionPolicy();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getReservationAgent(String queue) {
|
||||
return globalReservationQueueConfig.getReservationAgent();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean getShowReservationAsQueues(String queue) {
|
||||
return globalReservationQueueConfig.shouldShowReservationAsQueues();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getReplanner(String queue) {
|
||||
return globalReservationQueueConfig.getPlanner();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean getMoveOnExpiry(String queue) {
|
||||
return globalReservationQueueConfig.shouldMoveOnExpiry();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getEnforcementWindow(String queue) {
|
||||
return globalReservationQueueConfig.getEnforcementWindowMsec();
|
||||
}
|
||||
}
|
|
@ -86,7 +86,7 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
|
||||
private Thread reloadThread;
|
||||
private volatile boolean running = true;
|
||||
|
||||
|
||||
public AllocationFileLoaderService() {
|
||||
this(new SystemClock());
|
||||
}
|
||||
|
@ -222,6 +222,7 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
new HashMap<String, Float>();
|
||||
Map<String, Map<QueueACL, AccessControlList>> queueAcls =
|
||||
new HashMap<String, Map<QueueACL, AccessControlList>>();
|
||||
Set<String> reservableQueues = new HashSet<String>();
|
||||
int userMaxAppsDefault = Integer.MAX_VALUE;
|
||||
int queueMaxAppsDefault = Integer.MAX_VALUE;
|
||||
float queueMaxAMShareDefault = 0.5f;
|
||||
|
@ -230,6 +231,11 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
float defaultFairSharePreemptionThreshold = 0.5f;
|
||||
SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY;
|
||||
|
||||
// Reservation global configuration knobs
|
||||
String planner = null;
|
||||
String reservationAgent = null;
|
||||
String reservationAdmissionPolicy = null;
|
||||
|
||||
QueuePlacementPolicy newPlacementPolicy = null;
|
||||
|
||||
// Remember all queue names so we can display them on web UI, etc.
|
||||
|
@ -317,6 +323,15 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
defaultSchedPolicy = SchedulingPolicy.parse(text);
|
||||
} else if ("queuePlacementPolicy".equals(element.getTagName())) {
|
||||
placementPolicyElement = element;
|
||||
} else if ("reservation-planner".equals(element.getTagName())) {
|
||||
String text = ((Text) element.getFirstChild()).getData().trim();
|
||||
planner = text;
|
||||
} else if ("reservation-agent".equals(element.getTagName())) {
|
||||
String text = ((Text) element.getFirstChild()).getData().trim();
|
||||
reservationAgent = text;
|
||||
} else if ("reservation-policy".equals(element.getTagName())) {
|
||||
String text = ((Text) element.getFirstChild()).getData().trim();
|
||||
reservationAdmissionPolicy = text;
|
||||
} else {
|
||||
LOG.warn("Bad element in allocations file: " + element.getTagName());
|
||||
}
|
||||
|
@ -337,7 +352,8 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
loadQueue(parent, element, minQueueResources, maxQueueResources,
|
||||
queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
|
||||
queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts,
|
||||
fairSharePreemptionThresholds, queueAcls, configuredQueues);
|
||||
fairSharePreemptionThresholds, queueAcls, configuredQueues,
|
||||
reservableQueues);
|
||||
}
|
||||
|
||||
// Load placement policy and pass it configured queues
|
||||
|
@ -366,13 +382,27 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
defaultFairSharePreemptionThreshold);
|
||||
}
|
||||
|
||||
ReservationQueueConfiguration globalReservationQueueConfig = new
|
||||
ReservationQueueConfiguration();
|
||||
if (planner != null) {
|
||||
globalReservationQueueConfig.setPlanner(planner);
|
||||
}
|
||||
if (reservationAdmissionPolicy != null) {
|
||||
globalReservationQueueConfig.setReservationAdmissionPolicy
|
||||
(reservationAdmissionPolicy);
|
||||
}
|
||||
if (reservationAgent != null) {
|
||||
globalReservationQueueConfig.setReservationAgent(reservationAgent);
|
||||
}
|
||||
|
||||
AllocationConfiguration info = new AllocationConfiguration(minQueueResources,
|
||||
maxQueueResources, queueMaxApps, userMaxApps, queueWeights,
|
||||
queueMaxAMShares, userMaxAppsDefault, queueMaxAppsDefault,
|
||||
queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy,
|
||||
minSharePreemptionTimeouts, fairSharePreemptionTimeouts,
|
||||
fairSharePreemptionThresholds, queueAcls,
|
||||
newPlacementPolicy, configuredQueues);
|
||||
newPlacementPolicy, configuredQueues, globalReservationQueueConfig,
|
||||
reservableQueues);
|
||||
|
||||
lastSuccessfulReload = clock.getTime();
|
||||
lastReloadAttemptFailed = false;
|
||||
|
@ -392,8 +422,9 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
Map<String, Long> minSharePreemptionTimeouts,
|
||||
Map<String, Long> fairSharePreemptionTimeouts,
|
||||
Map<String, Float> fairSharePreemptionThresholds,
|
||||
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
|
||||
Map<FSQueueType, Set<String>> configuredQueues)
|
||||
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
|
||||
Map<FSQueueType, Set<String>> configuredQueues,
|
||||
Set<String> reservableQueues)
|
||||
throws AllocationConfigurationException {
|
||||
String queueName = element.getAttribute("name");
|
||||
|
||||
|
@ -460,14 +491,17 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
} else if ("aclAdministerApps".equals(field.getTagName())) {
|
||||
String text = ((Text)field.getFirstChild()).getData();
|
||||
acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
|
||||
} else if ("reservation".equals(field.getTagName())) {
|
||||
isLeaf = false;
|
||||
reservableQueues.add(queueName);
|
||||
configuredQueues.get(FSQueueType.PARENT).add(queueName);
|
||||
} else if ("queue".endsWith(field.getTagName()) ||
|
||||
"pool".equals(field.getTagName())) {
|
||||
loadQueue(queueName, field, minQueueResources, maxQueueResources,
|
||||
queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
|
||||
queuePolicies, minSharePreemptionTimeouts,
|
||||
fairSharePreemptionTimeouts, fairSharePreemptionThresholds,
|
||||
queueAcls, configuredQueues);
|
||||
configuredQueues.get(FSQueueType.PARENT).add(queueName);
|
||||
queueAcls, configuredQueues, reservableQueues);
|
||||
isLeaf = false;
|
||||
}
|
||||
}
|
||||
|
@ -479,6 +513,13 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
} else {
|
||||
configuredQueues.get(FSQueueType.LEAF).add(queueName);
|
||||
}
|
||||
} else {
|
||||
if ("parent".equals(element.getAttribute("type"))) {
|
||||
throw new AllocationConfigurationException("Both <reservation> and " +
|
||||
"type=\"parent\" found for queue " + queueName + " which is " +
|
||||
"unsupported");
|
||||
}
|
||||
configuredQueues.get(FSQueueType.PARENT).add(queueName);
|
||||
}
|
||||
queueAcls.put(queueName, acls);
|
||||
if (maxQueueResources.containsKey(queueName) &&
|
||||
|
|
|
@ -1568,4 +1568,16 @@ public class FairScheduler extends
|
|||
return EnumSet
|
||||
.of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getPlanQueues() throws YarnException {
|
||||
Set<String> planQueues = new HashSet<String>();
|
||||
for (FSQueue fsQueue : queueMgr.getQueues()) {
|
||||
String queueName = fsQueue.getName();
|
||||
if (allocConf.isReservable(queueName)) {
|
||||
planQueues.add(queueName);
|
||||
}
|
||||
}
|
||||
return planQueues;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,105 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
|
||||
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Unstable
|
||||
public class ReservationQueueConfiguration {
|
||||
private long reservationWindow;
|
||||
private long enforcementWindow;
|
||||
private String reservationAdmissionPolicy;
|
||||
private String reservationAgent;
|
||||
private String planner;
|
||||
private boolean showReservationAsQueues;
|
||||
private boolean moveOnExpiry;
|
||||
private float avgOverTimeMultiplier;
|
||||
private float maxOverTimeMultiplier;
|
||||
|
||||
public ReservationQueueConfiguration() {
|
||||
this.reservationWindow = ReservationSchedulerConfiguration
|
||||
.DEFAULT_RESERVATION_WINDOW;
|
||||
this.enforcementWindow = ReservationSchedulerConfiguration
|
||||
.DEFAULT_RESERVATION_ENFORCEMENT_WINDOW;
|
||||
this.reservationAdmissionPolicy = ReservationSchedulerConfiguration
|
||||
.DEFAULT_RESERVATION_ADMISSION_POLICY;
|
||||
this.reservationAgent = ReservationSchedulerConfiguration
|
||||
.DEFAULT_RESERVATION_AGENT_NAME;
|
||||
this.planner = ReservationSchedulerConfiguration
|
||||
.DEFAULT_RESERVATION_PLANNER_NAME;
|
||||
this.showReservationAsQueues = ReservationSchedulerConfiguration
|
||||
.DEFAULT_SHOW_RESERVATIONS_AS_QUEUES;
|
||||
this.moveOnExpiry = ReservationSchedulerConfiguration
|
||||
.DEFAULT_RESERVATION_MOVE_ON_EXPIRY;
|
||||
this.avgOverTimeMultiplier = ReservationSchedulerConfiguration
|
||||
.DEFAULT_CAPACITY_OVER_TIME_MULTIPLIER;
|
||||
this.maxOverTimeMultiplier = ReservationSchedulerConfiguration
|
||||
.DEFAULT_CAPACITY_OVER_TIME_MULTIPLIER;
|
||||
}
|
||||
|
||||
public long getReservationWindowMsec() {
|
||||
return reservationWindow;
|
||||
}
|
||||
|
||||
public long getEnforcementWindowMsec() {
|
||||
return enforcementWindow;
|
||||
}
|
||||
|
||||
public boolean shouldShowReservationAsQueues() {
|
||||
return showReservationAsQueues;
|
||||
}
|
||||
|
||||
public boolean shouldMoveOnExpiry() {
|
||||
return moveOnExpiry;
|
||||
}
|
||||
|
||||
public String getReservationAdmissionPolicy() {
|
||||
return reservationAdmissionPolicy;
|
||||
}
|
||||
|
||||
public String getReservationAgent() {
|
||||
return reservationAgent;
|
||||
}
|
||||
|
||||
public String getPlanner() {
|
||||
return planner;
|
||||
}
|
||||
|
||||
public float getAvgOverTimeMultiplier() {
|
||||
return avgOverTimeMultiplier;
|
||||
}
|
||||
|
||||
public float getMaxOverTimeMultiplier() {
|
||||
return maxOverTimeMultiplier;
|
||||
}
|
||||
|
||||
public void setPlanner(String planner) {
|
||||
this.planner = planner;
|
||||
}
|
||||
|
||||
public void setReservationAdmissionPolicy(String reservationAdmissionPolicy) {
|
||||
this.reservationAdmissionPolicy = reservationAdmissionPolicy;
|
||||
}
|
||||
|
||||
public void setReservationAgent(String reservationAgent) {
|
||||
this.reservationAgent = reservationAgent;
|
||||
}
|
||||
}
|
|
@ -88,9 +88,14 @@ public class FairSchedulerQueueInfo {
|
|||
fractionMemMaxShare = (float)maxResources.getMemory() / clusterResources.getMemory();
|
||||
|
||||
maxApps = allocConf.getQueueMaxApps(queueName);
|
||||
|
||||
Collection<FSQueue> children = queue.getChildQueues();
|
||||
|
||||
childQueues = new ArrayList<FairSchedulerQueueInfo>();
|
||||
if (allocConf.isReservable(queueName) &&
|
||||
!allocConf.getShowReservationAsQueues(queueName)) {
|
||||
return;
|
||||
}
|
||||
|
||||
Collection<FSQueue> children = queue.getChildQueues();
|
||||
for (FSQueue child : children) {
|
||||
if (child instanceof FSLeafQueue) {
|
||||
childQueues.add(new FairSchedulerLeafQueueInfo((FSLeafQueue)child, scheduler));
|
||||
|
|
|
@ -23,7 +23,9 @@ import static org.mockito.Mockito.doReturn;
|
|||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
|
@ -102,6 +104,62 @@ public class ReservationSystemTestUtil {
|
|||
.assertTrue(newPlan.getSharingPolicy() instanceof CapacityOverTimePolicy);
|
||||
}
|
||||
|
||||
static void setupFSAllocationFile(String allocationFile)
|
||||
throws IOException {
|
||||
PrintWriter out = new PrintWriter(new FileWriter(allocationFile));
|
||||
out.println("<?xml version=\"1.0\"?>");
|
||||
out.println("<allocations>");
|
||||
out.println("<queue name=\"default\">");
|
||||
out.println("<weight>1</weight>");
|
||||
out.println("</queue>");
|
||||
out.println("<queue name=\"a\">");
|
||||
out.println("<weight>1</weight>");
|
||||
out.println("<queue name=\"a1\">");
|
||||
out.println("<weight>3</weight>");
|
||||
out.println("</queue>");
|
||||
out.println("<queue name=\"a2\">");
|
||||
out.println("<weight>7</weight>");
|
||||
out.println("</queue>");
|
||||
out.println("</queue>");
|
||||
out.println("<queue name=\"dedicated\">");
|
||||
out.println("<reservation></reservation>");
|
||||
out.println("<weight>8</weight>");
|
||||
out.println("</queue>");
|
||||
out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
|
||||
out.println("</allocations>");
|
||||
out.close();
|
||||
}
|
||||
|
||||
static void updateFSAllocationFile(String allocationFile)
|
||||
throws IOException {
|
||||
PrintWriter out = new PrintWriter(new FileWriter(allocationFile));
|
||||
out.println("<?xml version=\"1.0\"?>");
|
||||
out.println("<allocations>");
|
||||
out.println("<queue name=\"default\">");
|
||||
out.println("<weight>5</weight>");
|
||||
out.println("</queue>");
|
||||
out.println("<queue name=\"a\">");
|
||||
out.println("<weight>5</weight>");
|
||||
out.println("<queue name=\"a1\">");
|
||||
out.println("<weight>3</weight>");
|
||||
out.println("</queue>");
|
||||
out.println("<queue name=\"a2\">");
|
||||
out.println("<weight>7</weight>");
|
||||
out.println("</queue>");
|
||||
out.println("</queue>");
|
||||
out.println("<queue name=\"dedicated\">");
|
||||
out.println("<reservation></reservation>");
|
||||
out.println("<weight>80</weight>");
|
||||
out.println("</queue>");
|
||||
out.println("<queue name=\"reservation\">");
|
||||
out.println("<reservation></reservation>");
|
||||
out.println("<weight>10</weight>");
|
||||
out.println("</queue>");
|
||||
out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
|
||||
out.println("</allocations>");
|
||||
out.close();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public CapacityScheduler mockCapacityScheduler(int numContainers)
|
||||
throws IOException {
|
||||
|
|
|
@ -0,0 +1,151 @@
|
|||
/*******************************************************************************
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*******************************************************************************/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerTestBase;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class TestFairReservationSystem extends FairSchedulerTestBase {
|
||||
private final static String ALLOC_FILE = new File(TEST_DIR,
|
||||
TestFairReservationSystem.class.getName() + ".xml").getAbsolutePath();
|
||||
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
Configuration conf = super.createConfiguration();
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException {
|
||||
conf = createConfiguration();
|
||||
}
|
||||
|
||||
@After
|
||||
public void teardown() {
|
||||
if (resourceManager != null) {
|
||||
resourceManager.stop();
|
||||
resourceManager = null;
|
||||
}
|
||||
conf = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFairReservationSystemInitialize() throws IOException {
|
||||
ReservationSystemTestUtil.setupFSAllocationFile(ALLOC_FILE);
|
||||
|
||||
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
|
||||
|
||||
// Setup
|
||||
RMContext mockRMContext = testUtil.createRMContext(conf);
|
||||
setupFairScheduler(testUtil, mockRMContext);
|
||||
|
||||
FairReservationSystem reservationSystem = new FairReservationSystem();
|
||||
reservationSystem.setRMContext(mockRMContext);
|
||||
|
||||
try {
|
||||
reservationSystem.reinitialize(scheduler.getConf(), mockRMContext);
|
||||
} catch (YarnException e) {
|
||||
Assert.fail(e.getMessage());
|
||||
}
|
||||
|
||||
ReservationSystemTestUtil.validateReservationQueue(reservationSystem,
|
||||
testUtil.getFullReservationQueueName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFairReservationSystemReinitialize() throws IOException {
|
||||
ReservationSystemTestUtil.setupFSAllocationFile(ALLOC_FILE);
|
||||
|
||||
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
|
||||
|
||||
// Setup
|
||||
RMContext mockContext = testUtil.createRMContext(conf);
|
||||
setupFairScheduler(testUtil, mockContext);
|
||||
|
||||
FairReservationSystem reservationSystem = new FairReservationSystem();
|
||||
reservationSystem.setRMContext(mockContext);
|
||||
|
||||
try {
|
||||
reservationSystem.reinitialize(scheduler.getConf(), mockContext);
|
||||
} catch (YarnException e) {
|
||||
Assert.fail(e.getMessage());
|
||||
}
|
||||
|
||||
// Assert queue in original config
|
||||
final String planQNam = testUtil.getFullReservationQueueName();
|
||||
ReservationSystemTestUtil.validateReservationQueue(reservationSystem,
|
||||
planQNam);
|
||||
|
||||
// Dynamically add a plan
|
||||
ReservationSystemTestUtil.updateFSAllocationFile(ALLOC_FILE);
|
||||
scheduler.reinitialize(conf, mockContext);
|
||||
|
||||
try {
|
||||
reservationSystem.reinitialize(conf, mockContext);
|
||||
} catch (YarnException e) {
|
||||
Assert.fail(e.getMessage());
|
||||
}
|
||||
|
||||
String newQueue = "root.reservation";
|
||||
ReservationSystemTestUtil.validateNewReservationQueue
|
||||
(reservationSystem, newQueue);
|
||||
}
|
||||
|
||||
private void setupFairScheduler(ReservationSystemTestUtil testUtil,
|
||||
RMContext rmContext) throws
|
||||
IOException {
|
||||
|
||||
scheduler = new FairScheduler();
|
||||
scheduler.setRMContext(rmContext);
|
||||
|
||||
int numContainers = 10;
|
||||
when(rmContext.getScheduler()).thenReturn(scheduler);
|
||||
|
||||
scheduler.init(conf);
|
||||
scheduler.start();
|
||||
scheduler.reinitialize(conf, rmContext);
|
||||
|
||||
Resource resource = testUtil.calculateClusterResource(numContainers);
|
||||
RMNode node1 = MockNodes.newNodeInfo(1, resource, 1, "127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
}
|
||||
}
|
|
@ -27,6 +27,7 @@ import java.util.List;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.NestedUserQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
|
||||
|
@ -550,6 +551,86 @@ public class TestAllocationFileLoaderService {
|
|||
allocLoader.reloadAllocations();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testReservableQueue() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||
|
||||
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||
out.println("<?xml version=\"1.0\"?>");
|
||||
out.println("<allocations>");
|
||||
out.println("<queue name=\"reservable\">");
|
||||
out.println("<reservation>");
|
||||
out.println("</reservation>");
|
||||
out.println("</queue>");
|
||||
out.println("<queue name=\"other\">");
|
||||
out.println("</queue>");
|
||||
out.println("<reservation-agent>DummyAgentName</reservation-agent>");
|
||||
out.println("<reservation-policy>AnyAdmissionPolicy</reservation-policy>");
|
||||
out.println("</allocations>");
|
||||
out.close();
|
||||
|
||||
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
|
||||
allocLoader.init(conf);
|
||||
ReloadListener confHolder = new ReloadListener();
|
||||
allocLoader.setReloadListener(confHolder);
|
||||
allocLoader.reloadAllocations();
|
||||
|
||||
AllocationConfiguration allocConf = confHolder.allocConf;
|
||||
String reservableQueueName = "root.reservable";
|
||||
String nonreservableQueueName = "root.other";
|
||||
assertFalse(allocConf.isReservable(nonreservableQueueName));
|
||||
assertTrue(allocConf.isReservable(reservableQueueName));
|
||||
|
||||
assertTrue(allocConf.getMoveOnExpiry(reservableQueueName));
|
||||
assertEquals(ReservationSchedulerConfiguration.DEFAULT_RESERVATION_WINDOW,
|
||||
allocConf.getReservationWindow(reservableQueueName));
|
||||
assertEquals(100, allocConf.getInstantaneousMaxCapacity
|
||||
(reservableQueueName),
|
||||
0.0001);
|
||||
assertEquals(
|
||||
"DummyAgentName",
|
||||
allocConf.getReservationAgent(reservableQueueName));
|
||||
assertEquals(100, allocConf.getAverageCapacity(reservableQueueName), 0.001);
|
||||
assertFalse(allocConf.getShowReservationAsQueues(reservableQueueName));
|
||||
assertEquals("AnyAdmissionPolicy",
|
||||
allocConf.getReservationAdmissionPolicy(reservableQueueName));
|
||||
assertEquals(ReservationSchedulerConfiguration
|
||||
.DEFAULT_RESERVATION_PLANNER_NAME,
|
||||
allocConf.getReplanner(reservableQueueName));
|
||||
assertEquals(ReservationSchedulerConfiguration
|
||||
.DEFAULT_RESERVATION_ENFORCEMENT_WINDOW,
|
||||
allocConf.getEnforcementWindow(reservableQueueName));
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that you can't have dynamic user queue and reservable queue on
|
||||
* the same queue
|
||||
*/
|
||||
@Test (expected = AllocationConfigurationException.class)
|
||||
public void testReservableCannotBeCombinedWithDynamicUserQueue()
|
||||
throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||
|
||||
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||
out.println("<?xml version=\"1.0\"?>");
|
||||
out.println("<allocations>");
|
||||
out.println("<queue name=\"notboth\" type=\"parent\" >");
|
||||
out.println("<reservation>");
|
||||
out.println("</reservation>");
|
||||
out.println("</queue>");
|
||||
out.println("</allocations>");
|
||||
out.close();
|
||||
|
||||
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
|
||||
allocLoader.init(conf);
|
||||
ReloadListener confHolder = new ReloadListener();
|
||||
allocLoader.setReloadListener(confHolder);
|
||||
allocLoader.reloadAllocations();
|
||||
}
|
||||
|
||||
private class ReloadListener implements AllocationFileLoaderService.Listener {
|
||||
public AllocationConfiguration allocConf;
|
||||
|
||||
|
|
Loading…
Reference in New Issue