YARN-2690. [YARN-2574] Make ReservationSystem and its dependent classes independent of Scheduler type. (Anubhav Dhoot via kasha)

(cherry picked from commit 2fce6d6141)
This commit is contained in:
Karthik Kambatla 2014-11-17 16:45:57 -08:00
parent 0b9a1a3e03
commit 8df4c04c3a
16 changed files with 436 additions and 226 deletions

View File

@ -21,6 +21,7 @@ Release 2.7.0 - UNRELEASED
YARN-2236. [YARN-1492] Shared Cache uploader service on the Node YARN-2236. [YARN-1492] Shared Cache uploader service on the Node
Manager. (Chris Trezzo and Sanjin Lee via kasha) Manager. (Chris Trezzo and Sanjin Lee via kasha)
IMPROVEMENTS IMPROVEMENTS
YARN-1979. TestDirectoryCollection fails when the umask is unusual. YARN-1979. TestDirectoryCollection fails when the umask is unusual.
@ -44,6 +45,9 @@ Release 2.7.0 - UNRELEASED
YARN-2780. Log aggregated resource allocation in rm-appsummary.log (Eric YARN-2780. Log aggregated resource allocation in rm-appsummary.log (Eric
Payne via jlowe) Payne via jlowe)
YARN-2690. [YARN-2574] Make ReservationSystem and its dependent classes
independent of Scheduler type. (Anubhav Dhoot via kasha)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -34,15 +34,18 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.UTCClock; import org.apache.hadoop.yarn.util.UTCClock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -326,13 +329,103 @@ public abstract class AbstractReservationSystem extends AbstractService
return null; return null;
} }
protected abstract Plan initializePlan(String planQueueName) protected Plan initializePlan(String planQueueName) throws YarnException {
throws YarnException; String planQueuePath = getPlanQueuePath(planQueueName);
SharingPolicy adPolicy = getAdmissionPolicy(planQueuePath);
protected abstract Planner getReplanner(String planQueueName); adPolicy.init(planQueuePath, getReservationSchedulerConfiguration());
// Calculate the max plan capacity
protected abstract ReservationAgent getAgent(String queueName); Resource minAllocation = getMinAllocation();
Resource maxAllocation = getMaxAllocation();
protected abstract SharingPolicy getAdmissionPolicy(String queueName); ResourceCalculator rescCalc = getResourceCalculator();
Resource totCap = getPlanQueueCapacity(planQueueName);
Plan plan =
new InMemoryPlan(getRootQueueMetrics(), adPolicy,
getAgent(planQueuePath), totCap, planStepSize, rescCalc,
minAllocation, maxAllocation, planQueueName,
getReplanner(planQueuePath), getReservationSchedulerConfiguration()
.getMoveOnExpiry(planQueuePath));
LOG.info("Intialized plan {0} based on reservable queue {1}",
plan.toString(), planQueueName);
return plan;
}
protected Planner getReplanner(String planQueueName) {
ReservationSchedulerConfiguration reservationConfig =
getReservationSchedulerConfiguration();
String plannerClassName = reservationConfig.getReplanner(planQueueName);
LOG.info("Using Replanner: " + plannerClassName + " for queue: "
+ planQueueName);
try {
Class<?> plannerClazz = conf.getClassByName(plannerClassName);
if (Planner.class.isAssignableFrom(plannerClazz)) {
Planner planner =
(Planner) ReflectionUtils.newInstance(plannerClazz, conf);
planner.init(planQueueName, reservationConfig);
return planner;
} else {
throw new YarnRuntimeException("Class: " + plannerClazz
+ " not instance of " + Planner.class.getCanonicalName());
}
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException("Could not instantiate Planner: "
+ plannerClassName + " for queue: " + planQueueName, e);
}
}
protected ReservationAgent getAgent(String queueName) {
ReservationSchedulerConfiguration reservationConfig =
getReservationSchedulerConfiguration();
String agentClassName = reservationConfig.getReservationAgent(queueName);
LOG.info("Using Agent: " + agentClassName + " for queue: " + queueName);
try {
Class<?> agentClazz = conf.getClassByName(agentClassName);
if (ReservationAgent.class.isAssignableFrom(agentClazz)) {
return (ReservationAgent) ReflectionUtils.newInstance(agentClazz, conf);
} else {
throw new YarnRuntimeException("Class: " + agentClassName
+ " not instance of " + ReservationAgent.class.getCanonicalName());
}
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException("Could not instantiate Agent: "
+ agentClassName + " for queue: " + queueName, e);
}
}
protected SharingPolicy getAdmissionPolicy(String queueName) {
ReservationSchedulerConfiguration reservationConfig =
getReservationSchedulerConfiguration();
String admissionPolicyClassName =
reservationConfig.getReservationAdmissionPolicy(queueName);
LOG.info("Using AdmissionPolicy: " + admissionPolicyClassName
+ " for queue: " + queueName);
try {
Class<?> admissionPolicyClazz =
conf.getClassByName(admissionPolicyClassName);
if (SharingPolicy.class.isAssignableFrom(admissionPolicyClazz)) {
return (SharingPolicy) ReflectionUtils.newInstance(
admissionPolicyClazz, conf);
} else {
throw new YarnRuntimeException("Class: " + admissionPolicyClassName
+ " not instance of " + SharingPolicy.class.getCanonicalName());
}
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException("Could not instantiate AdmissionPolicy: "
+ admissionPolicyClassName + " for queue: " + queueName, e);
}
}
protected abstract ReservationSchedulerConfiguration
getReservationSchedulerConfiguration();
protected abstract String getPlanQueuePath(String planQueueName);
protected abstract Resource getPlanQueueCapacity(String planQueueName);
protected abstract Resource getMinAllocation();
protected abstract Resource getMaxAllocation();
protected abstract ResourceCalculator getResourceCalculator();
protected abstract QueueMetrics getRootQueueMetrics();
} }

View File

@ -21,13 +21,11 @@ import java.util.Date;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
/** /**
@ -52,7 +50,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
@Unstable @Unstable
public class CapacityOverTimePolicy implements SharingPolicy { public class CapacityOverTimePolicy implements SharingPolicy {
private CapacitySchedulerConfiguration conf; private ReservationSchedulerConfiguration conf;
private long validWindow; private long validWindow;
private float maxInst; private float maxInst;
private float maxAvg; private float maxAvg;
@ -61,13 +59,9 @@ public class CapacityOverTimePolicy implements SharingPolicy {
// configuration structure of the schedulers (e.g., SchedulerConfiguration) // configuration structure of the schedulers (e.g., SchedulerConfiguration)
// it should be easy to remove this limitation // it should be easy to remove this limitation
@Override @Override
public void init(String reservationQueuePath, Configuration conf) { public void init(String reservationQueuePath,
if (!(conf instanceof CapacitySchedulerConfiguration)) { ReservationSchedulerConfiguration conf) {
throw new IllegalArgumentException("Unexpected conf type: " this.conf = conf;
+ conf.getClass().getSimpleName() + " only supported conf is: "
+ CapacitySchedulerConfiguration.class.getSimpleName());
}
this.conf = (CapacitySchedulerConfiguration) conf;
validWindow = this.conf.getReservationWindow(reservationQueuePath); validWindow = this.conf.getReservationWindow(reservationQueuePath);
maxInst = this.conf.getInstantaneousMaxCapacity(reservationQueuePath) / 100; maxInst = this.conf.getInstantaneousMaxCapacity(reservationQueuePath) / 100;
maxAvg = this.conf.getAverageCapacity(reservationQueuePath) / 100; maxAvg = this.conf.getAverageCapacity(reservationQueuePath) / 100;

View File

@ -21,15 +21,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -67,95 +66,45 @@ public class CapacityReservationSystem extends AbstractReservationSystem {
} }
@Override @Override
protected Plan initializePlan(String planQueueName) throws YarnException { protected Resource getMinAllocation() {
SharingPolicy adPolicy = getAdmissionPolicy(planQueueName); return capScheduler.getMinimumResourceCapability();
String planQueuePath = capScheduler.getQueue(planQueueName).getQueuePath(); }
adPolicy.init(planQueuePath, capScheduler.getConfiguration());
@Override
protected Resource getMaxAllocation() {
return capScheduler.getMaximumResourceCapability();
}
@Override
protected ResourceCalculator getResourceCalculator() {
return capScheduler.getResourceCalculator();
}
@Override
protected QueueMetrics getRootQueueMetrics() {
return capScheduler.getRootQueueMetrics();
}
@Override
protected String getPlanQueuePath(String planQueueName) {
return capScheduler.getQueue(planQueueName).getQueuePath();
}
@Override
protected Resource getPlanQueueCapacity(String planQueueName) {
Resource minAllocation = getMinAllocation();
ResourceCalculator rescCalc = getResourceCalculator();
CSQueue planQueue = capScheduler.getQueue(planQueueName); CSQueue planQueue = capScheduler.getQueue(planQueueName);
// Calculate the max plan capacity return rescCalc.multiplyAndNormalizeDown(capScheduler.getClusterResource(),
Resource minAllocation = capScheduler.getMinimumResourceCapability();
ResourceCalculator rescCalc = capScheduler.getResourceCalculator();
Resource totCap =
rescCalc.multiplyAndNormalizeDown(capScheduler.getClusterResource(),
planQueue.getAbsoluteCapacity(), minAllocation); planQueue.getAbsoluteCapacity(), minAllocation);
Plan plan =
new InMemoryPlan(capScheduler.getRootQueueMetrics(), adPolicy,
getAgent(planQueuePath), totCap, planStepSize, rescCalc,
minAllocation, capScheduler.getMaximumResourceCapability(),
planQueueName, getReplanner(planQueuePath), capScheduler
.getConfiguration().getMoveOnExpiry(planQueuePath));
LOG.info("Intialized plan {0} based on reservable queue {1}",
plan.toString(), planQueueName);
return plan;
} }
@Override @Override
protected Planner getReplanner(String planQueueName) { protected ReservationSchedulerConfiguration
CapacitySchedulerConfiguration capSchedulerConfig = getReservationSchedulerConfiguration() {
capScheduler.getConfiguration(); return capScheduler.getConfiguration();
String plannerClassName = capSchedulerConfig.getReplanner(planQueueName);
LOG.info("Using Replanner: " + plannerClassName + " for queue: "
+ planQueueName);
try {
Class<?> plannerClazz =
capSchedulerConfig.getClassByName(plannerClassName);
if (Planner.class.isAssignableFrom(plannerClazz)) {
Planner planner =
(Planner) ReflectionUtils.newInstance(plannerClazz, conf);
planner.init(planQueueName, capSchedulerConfig);
return planner;
} else {
throw new YarnRuntimeException("Class: " + plannerClazz
+ " not instance of " + Planner.class.getCanonicalName());
}
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException("Could not instantiate Planner: "
+ plannerClassName + " for queue: " + planQueueName, e);
}
}
@Override
protected ReservationAgent getAgent(String queueName) {
CapacitySchedulerConfiguration capSchedulerConfig =
capScheduler.getConfiguration();
String agentClassName = capSchedulerConfig.getReservationAgent(queueName);
LOG.info("Using Agent: " + agentClassName + " for queue: " + queueName);
try {
Class<?> agentClazz = capSchedulerConfig.getClassByName(agentClassName);
if (ReservationAgent.class.isAssignableFrom(agentClazz)) {
return (ReservationAgent) ReflectionUtils.newInstance(agentClazz, conf);
} else {
throw new YarnRuntimeException("Class: " + agentClassName
+ " not instance of " + ReservationAgent.class.getCanonicalName());
}
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException("Could not instantiate Agent: "
+ agentClassName + " for queue: " + queueName, e);
}
}
@Override
protected SharingPolicy getAdmissionPolicy(String queueName) {
CapacitySchedulerConfiguration capSchedulerConfig =
capScheduler.getConfiguration();
String admissionPolicyClassName =
capSchedulerConfig.getReservationAdmissionPolicy(queueName);
LOG.info("Using AdmissionPolicy: " + admissionPolicyClassName
+ " for queue: " + queueName);
try {
Class<?> admissionPolicyClazz =
capSchedulerConfig.getClassByName(admissionPolicyClassName);
if (SharingPolicy.class.isAssignableFrom(admissionPolicyClazz)) {
return (SharingPolicy) ReflectionUtils.newInstance(
admissionPolicyClazz, conf);
} else {
throw new YarnRuntimeException("Class: " + admissionPolicyClassName
+ " not instance of " + SharingPolicy.class.getCanonicalName());
}
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException("Could not instantiate AdmissionPolicy: "
+ admissionPolicyClassName + " for queue: " + queueName, e);
}
} }
} }

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
@ -85,7 +84,8 @@ public class NoOverCommitPolicy implements SharingPolicy {
} }
@Override @Override
public void init(String planQueuePath, Configuration conf) { public void init(String planQueuePath,
ReservationSchedulerConfiguration conf) {
// nothing to do for this policy // nothing to do for this policy
} }

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.List; import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
@ -44,5 +43,5 @@ public interface Planner {
* @param planQueueName the name of the queue for this plan * @param planQueueName the name of the queue for this plan
* @param conf the scheduler configuration * @param conf the scheduler configuration
*/ */
void init(String planQueueName, Configuration conf); void init(String planQueueName, ReservationSchedulerConfiguration conf);
} }

View File

@ -0,0 +1,159 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
public abstract class ReservationSchedulerConfiguration extends Configuration {
@InterfaceAudience.Private
public static final long DEFAULT_RESERVATION_WINDOW = 24*60*60*1000; // 1 day in msec
@InterfaceAudience.Private
public static final String DEFAULT_RESERVATION_ADMISSION_POLICY =
"org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy";
@InterfaceAudience.Private
public static final String DEFAULT_RESERVATION_AGENT_NAME =
"org.apache.hadoop.yarn.server.resourcemanager.reservation.GreedyReservationAgent";
@InterfaceAudience.Private
public static final String DEFAULT_RESERVATION_PLANNER_NAME =
"org.apache.hadoop.yarn.server.resourcemanager.reservation.SimpleCapacityReplanner";
@InterfaceAudience.Private
public static final boolean DEFAULT_RESERVATION_MOVE_ON_EXPIRY = true;
// default to 1h lookahead enforcement
@InterfaceAudience.Private
public static final long DEFAULT_RESERVATION_ENFORCEMENT_WINDOW = 60*60*1000;
// 1 hour
@InterfaceAudience.Private
public static final boolean DEFAULT_SHOW_RESERVATIONS_AS_QUEUES = false;
@InterfaceAudience.Private
public static final float DEFAULT_CAPACITY_OVER_TIME_MULTIPLIER = 1;
public ReservationSchedulerConfiguration() { super(); }
public ReservationSchedulerConfiguration(
Configuration configuration) {
super(configuration);
}
/**
* Checks if the queue participates in reservation based scheduling
* @param queue
* @return true if the queue participates in reservation based scheduling
*/
public abstract boolean isReservable(String queue);
/**
* Gets the length of time in milliseconds for which the {@link SharingPolicy}
* checks for validity
* @param queue name of the queue
* @return length in time in milliseconds for which to check the
* {@link SharingPolicy}
*/
public long getReservationWindow(String queue) {
return DEFAULT_RESERVATION_WINDOW;
}
/**
* Gets the average allowed capacity which will aggregated over the
* {@link ReservationSchedulerConfiguration#getReservationWindow} by the
* the {@link SharingPolicy} to check aggregate used capacity
* @param queue name of the queue
* @return average capacity allowed by the {@link SharingPolicy}
*/
public float getAverageCapacity(String queue) {
return DEFAULT_CAPACITY_OVER_TIME_MULTIPLIER;
}
/**
* Gets the maximum capacity at any time that the {@link SharingPolicy} allows
* @param queue name of the queue
* @return maximum allowed capacity at any time
*/
public float getInstantaneousMaxCapacity(String queue) {
return DEFAULT_CAPACITY_OVER_TIME_MULTIPLIER;
}
/**
* Gets the name of the {@link SharingPolicy} class associated with the queue
* @param queue name of the queue
* @return the class name of the {@link SharingPolicy}
*/
public String getReservationAdmissionPolicy(String queue) {
return DEFAULT_RESERVATION_ADMISSION_POLICY;
}
/**
* Gets the name of the {@link ReservationAgent} class associated with the
* queue
* @param queue name of the queue
* @return the class name of the {@link ReservationAgent}
*/
public String getReservationAgent(String queue) {
return DEFAULT_RESERVATION_AGENT_NAME;
}
/**
* Checks whether the reservation queues be hidden or visible
* @param queuePath name of the queue
* @return true if reservation queues should be visible
*/
public boolean getShowReservationAsQueues(String queuePath) {
return DEFAULT_SHOW_RESERVATIONS_AS_QUEUES;
}
/**
* Gets the name of the {@link Planner} class associated with the
* queue
* @param queue name of the queue
* @return the class name of the {@link Planner}
*/
public String getReplanner(String queue) {
return DEFAULT_RESERVATION_PLANNER_NAME;
}
/**
* Gets whether the applications should be killed or moved to the parent queue
* when the {@link ReservationDefinition} expires
* @param queue name of the queue
* @return true if application should be moved, false if they need to be
* killed
*/
public boolean getMoveOnExpiry(String queue) {
return DEFAULT_RESERVATION_MOVE_ON_EXPIRY;
}
/**
* Gets the time in milliseconds for which the {@link Planner} will verify
* the {@link Plan}s satisfy the constraints
* @param queue name of the queue
* @return the time in milliseconds for which to check constraints
*/
public long getEnforcementWindow(String queue) {
return DEFAULT_RESERVATION_ENFORCEMENT_WINDOW;
}
}

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
/** /**
@ -38,7 +37,7 @@ public interface SharingPolicy {
* @param planQueuePath the name of the queue for this plan * @param planQueuePath the name of the queue for this plan
* @param conf the system configuration * @param conf the system configuration
*/ */
public void init(String planQueuePath, Configuration conf); public void init(String planQueuePath, ReservationSchedulerConfiguration conf);
/** /**
* This method runs the policy validation logic, and return true/false on * This method runs the policy validation logic, and return true/false on

View File

@ -25,11 +25,9 @@ import java.util.TreeSet;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.UTCClock; import org.apache.hadoop.yarn.util.UTCClock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@ -69,15 +67,9 @@ public class SimpleCapacityReplanner implements Planner {
} }
@Override @Override
public void init(String planQueueName, Configuration conf) { public void init(String planQueueName,
if (!(conf instanceof CapacitySchedulerConfiguration)) { ReservationSchedulerConfiguration conf) {
throw new IllegalArgumentException("Unexpected conf type: " this.lengthOfCheckZone = conf.getEnforcementWindow(planQueueName);
+ conf.getClass().getSimpleName() + " only supported conf is: "
+ CapacitySchedulerConfiguration.class.getSimpleName());
}
this.lengthOfCheckZone =
((CapacitySchedulerConfiguration) conf)
.getEnforcementWindow(planQueueName);
} }
@Override @Override

View File

@ -41,13 +41,14 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
public class CapacitySchedulerConfiguration extends Configuration { public class CapacitySchedulerConfiguration extends ReservationSchedulerConfiguration {
private static final Log LOG = private static final Log LOG =
LogFactory.getLog(CapacitySchedulerConfiguration.class); LogFactory.getLog(CapacitySchedulerConfiguration.class);
@ -221,9 +222,6 @@ public class CapacitySchedulerConfiguration extends Configuration {
public static final String INSTANTANEOUS_MAX_CAPACITY = public static final String INSTANTANEOUS_MAX_CAPACITY =
"instantaneous-max-capacity"; "instantaneous-max-capacity";
@Private
public static final long DEFAULT_RESERVATION_WINDOW = 86400000L;
@Private @Private
public static final String RESERVATION_ADMISSION_POLICY = public static final String RESERVATION_ADMISSION_POLICY =
"reservation-policy"; "reservation-policy";
@ -235,36 +233,17 @@ public class CapacitySchedulerConfiguration extends Configuration {
public static final String RESERVATION_SHOW_RESERVATION_AS_QUEUE = public static final String RESERVATION_SHOW_RESERVATION_AS_QUEUE =
"show-reservations-as-queues"; "show-reservations-as-queues";
@Private
public static final String DEFAULT_RESERVATION_ADMISSION_POLICY =
"org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy";
@Private
public static final String DEFAULT_RESERVATION_AGENT_NAME =
"org.apache.hadoop.yarn.server.resourcemanager.reservation.GreedyReservationAgent";
@Private @Private
public static final String RESERVATION_PLANNER_NAME = "reservation-planner"; public static final String RESERVATION_PLANNER_NAME = "reservation-planner";
@Private
public static final String DEFAULT_RESERVATION_PLANNER_NAME =
"org.apache.hadoop.yarn.server.resourcemanager.reservation.SimpleCapacityReplanner";
@Private @Private
public static final String RESERVATION_MOVE_ON_EXPIRY = public static final String RESERVATION_MOVE_ON_EXPIRY =
"reservation-move-on-expiry"; "reservation-move-on-expiry";
@Private
public static final boolean DEFAULT_RESERVATION_MOVE_ON_EXPIRY = true;
@Private @Private
public static final String RESERVATION_ENFORCEMENT_WINDOW = public static final String RESERVATION_ENFORCEMENT_WINDOW =
"reservation-enforcement-window"; "reservation-enforcement-window";
// default to 1h lookahead enforcement
@Private
public static final long DEFAULT_RESERVATION_ENFORCEMENT_WINDOW = 3600000;
public CapacitySchedulerConfiguration() { public CapacitySchedulerConfiguration() {
this(new Configuration()); this(new Configuration());
} }
@ -721,6 +700,7 @@ public class CapacitySchedulerConfiguration extends Configuration {
+ ", isReservableQueue=" + isReservable(queue)); + ", isReservableQueue=" + isReservable(queue));
} }
@Override
public long getReservationWindow(String queue) { public long getReservationWindow(String queue) {
long reservationWindow = long reservationWindow =
getLong(getQueuePrefix(queue) + RESERVATION_WINDOW, getLong(getQueuePrefix(queue) + RESERVATION_WINDOW,
@ -728,6 +708,7 @@ public class CapacitySchedulerConfiguration extends Configuration {
return reservationWindow; return reservationWindow;
} }
@Override
public float getAverageCapacity(String queue) { public float getAverageCapacity(String queue) {
float avgCapacity = float avgCapacity =
getFloat(getQueuePrefix(queue) + AVERAGE_CAPACITY, getFloat(getQueuePrefix(queue) + AVERAGE_CAPACITY,
@ -735,6 +716,7 @@ public class CapacitySchedulerConfiguration extends Configuration {
return avgCapacity; return avgCapacity;
} }
@Override
public float getInstantaneousMaxCapacity(String queue) { public float getInstantaneousMaxCapacity(String queue) {
float instMaxCapacity = float instMaxCapacity =
getFloat(getQueuePrefix(queue) + INSTANTANEOUS_MAX_CAPACITY, getFloat(getQueuePrefix(queue) + INSTANTANEOUS_MAX_CAPACITY,
@ -755,6 +737,7 @@ public class CapacitySchedulerConfiguration extends Configuration {
setFloat(getQueuePrefix(queue) + AVERAGE_CAPACITY, avgCapacity); setFloat(getQueuePrefix(queue) + AVERAGE_CAPACITY, avgCapacity);
} }
@Override
public String getReservationAdmissionPolicy(String queue) { public String getReservationAdmissionPolicy(String queue) {
String reservationPolicy = String reservationPolicy =
get(getQueuePrefix(queue) + RESERVATION_ADMISSION_POLICY, get(getQueuePrefix(queue) + RESERVATION_ADMISSION_POLICY,
@ -767,6 +750,7 @@ public class CapacitySchedulerConfiguration extends Configuration {
set(getQueuePrefix(queue) + RESERVATION_ADMISSION_POLICY, reservationPolicy); set(getQueuePrefix(queue) + RESERVATION_ADMISSION_POLICY, reservationPolicy);
} }
@Override
public String getReservationAgent(String queue) { public String getReservationAgent(String queue) {
String reservationAgent = String reservationAgent =
get(getQueuePrefix(queue) + RESERVATION_AGENT_NAME, get(getQueuePrefix(queue) + RESERVATION_AGENT_NAME,
@ -778,13 +762,16 @@ public class CapacitySchedulerConfiguration extends Configuration {
set(getQueuePrefix(queue) + RESERVATION_AGENT_NAME, reservationPolicy); set(getQueuePrefix(queue) + RESERVATION_AGENT_NAME, reservationPolicy);
} }
@Override
public boolean getShowReservationAsQueues(String queuePath) { public boolean getShowReservationAsQueues(String queuePath) {
boolean showReservationAsQueues = boolean showReservationAsQueues =
getBoolean(getQueuePrefix(queuePath) getBoolean(getQueuePrefix(queuePath)
+ RESERVATION_SHOW_RESERVATION_AS_QUEUE, false); + RESERVATION_SHOW_RESERVATION_AS_QUEUE,
DEFAULT_SHOW_RESERVATIONS_AS_QUEUES);
return showReservationAsQueues; return showReservationAsQueues;
} }
@Override
public String getReplanner(String queue) { public String getReplanner(String queue) {
String replanner = String replanner =
get(getQueuePrefix(queue) + RESERVATION_PLANNER_NAME, get(getQueuePrefix(queue) + RESERVATION_PLANNER_NAME,
@ -792,6 +779,7 @@ public class CapacitySchedulerConfiguration extends Configuration {
return replanner; return replanner;
} }
@Override
public boolean getMoveOnExpiry(String queue) { public boolean getMoveOnExpiry(String queue) {
boolean killOnExpiry = boolean killOnExpiry =
getBoolean(getQueuePrefix(queue) + RESERVATION_MOVE_ON_EXPIRY, getBoolean(getQueuePrefix(queue) + RESERVATION_MOVE_ON_EXPIRY,
@ -799,6 +787,7 @@ public class CapacitySchedulerConfiguration extends Configuration {
return killOnExpiry; return killOnExpiry;
} }
@Override
public long getEnforcementWindow(String queue) { public long getEnforcementWindow(String queue) {
long enforcementWindow = long enforcementWindow =
getLong(getQueuePrefix(queue) + RESERVATION_ENFORCEMENT_WINDOW, getLong(getQueuePrefix(queue) + RESERVATION_ENFORCEMENT_WINDOW,

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation; package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anySetOf;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -29,6 +30,7 @@ import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest; import org.apache.hadoop.yarn.api.records.ReservationRequest;
@ -41,6 +43,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
@ -61,6 +64,44 @@ public class ReservationSystemTestUtil {
return ReservationId.newInstance(rand.nextLong(), rand.nextLong()); return ReservationId.newInstance(rand.nextLong(), rand.nextLong());
} }
public static ReservationSchedulerConfiguration createConf(
String reservationQ, long timeWindow, float instConstraint,
float avgConstraint) {
ReservationSchedulerConfiguration conf = mock
(ReservationSchedulerConfiguration.class);
when(conf.getReservationWindow(reservationQ)).thenReturn(timeWindow);
when(conf.getInstantaneousMaxCapacity(reservationQ)).thenReturn
(instConstraint);
when(conf.getAverageCapacity(reservationQ)).thenReturn(avgConstraint);
return conf;
}
public static void validateReservationQueue(
AbstractReservationSystem reservationSystem, String planQName) {
Plan plan = reservationSystem.getPlan(planQName);
Assert.assertNotNull(plan);
Assert.assertTrue(plan instanceof InMemoryPlan);
Assert.assertEquals(planQName, plan.getQueueName());
Assert.assertEquals(8192, plan.getTotalCapacity().getMemory());
Assert.assertTrue(
plan.getReservationAgent() instanceof GreedyReservationAgent);
Assert.assertTrue(
plan.getSharingPolicy() instanceof CapacityOverTimePolicy);
}
public static void validateNewReservationQueue(
AbstractReservationSystem reservationSystem, String newQ) {
Plan newPlan = reservationSystem.getPlan(newQ);
Assert.assertNotNull(newPlan);
Assert.assertTrue(newPlan instanceof InMemoryPlan);
Assert.assertEquals(newQ, newPlan.getQueueName());
Assert.assertEquals(1024, newPlan.getTotalCapacity().getMemory());
Assert
.assertTrue(newPlan.getReservationAgent() instanceof GreedyReservationAgent);
Assert
.assertTrue(newPlan.getSharingPolicy() instanceof CapacityOverTimePolicy);
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public CapacityScheduler mockCapacityScheduler(int numContainers) public CapacityScheduler mockCapacityScheduler(int numContainers)
throws IOException { throws IOException {
@ -70,6 +111,29 @@ public class ReservationSystemTestUtil {
CapacityScheduler cs = Mockito.spy(new CapacityScheduler()); CapacityScheduler cs = Mockito.spy(new CapacityScheduler());
cs.setConf(new YarnConfiguration()); cs.setConf(new YarnConfiguration());
RMContext mockRmContext = createRMContext(conf);
cs.setRMContext(mockRmContext);
try {
cs.serviceInit(conf);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
initializeRMContext(numContainers, cs, mockRmContext);
return cs;
}
public static void initializeRMContext(int numContainers,
AbstractYarnScheduler scheduler, RMContext mockRMContext) {
when(mockRMContext.getScheduler()).thenReturn(scheduler);
Resource r = calculateClusterResource(numContainers);
doReturn(r).when(scheduler).getClusterResource();
}
public static RMContext createRMContext(Configuration conf) {
RMContext mockRmContext = RMContext mockRmContext =
Mockito.spy(new RMContextImpl(null, null, null, null, null, null, Mockito.spy(new RMContextImpl(null, null, null, null, null, null,
new RMContainerTokenSecretManager(conf), new RMContainerTokenSecretManager(conf),
@ -78,7 +142,7 @@ public class ReservationSystemTestUtil {
RMNodeLabelsManager nlm = mock(RMNodeLabelsManager.class); RMNodeLabelsManager nlm = mock(RMNodeLabelsManager.class);
when( when(
nlm.getQueueResource(any(String.class), any(Set.class), nlm.getQueueResource(any(String.class), anySetOf(String.class),
any(Resource.class))).thenAnswer(new Answer<Resource>() { any(Resource.class))).thenAnswer(new Answer<Resource>() {
@Override @Override
public Resource answer(InvocationOnMock invocation) throws Throwable { public Resource answer(InvocationOnMock invocation) throws Throwable {
@ -97,17 +161,7 @@ public class ReservationSystemTestUtil {
}); });
mockRmContext.setNodeLabelManager(nlm); mockRmContext.setNodeLabelManager(nlm);
return mockRmContext;
cs.setRMContext(mockRmContext);
try {
cs.serviceInit(conf);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
when(mockRmContext.getScheduler()).thenReturn(cs);
Resource r = Resource.newInstance(numContainers * 1024, numContainers);
doReturn(r).when(cs).getClusterResource();
return cs;
} }
public static void setupQueueConfiguration(CapacitySchedulerConfiguration conf) { public static void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
@ -254,4 +308,9 @@ public class ReservationSystemTestUtil {
return req; return req;
} }
public static Resource calculateClusterResource(int numContainers) {
Resource clusterResource = Resource.newInstance(numContainers * 1024,
numContainers);
return clusterResource;
}
} }

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
@ -29,8 +30,8 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.Assert; import org.junit.Assert;
@ -74,18 +75,18 @@ public class TestCapacityOverTimePolicy {
mAgent = mock(ReservationAgent.class); mAgent = mock(ReservationAgent.class);
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
CapacityScheduler scheduler = testUtil.mockCapacityScheduler(totCont); QueueMetrics rootQueueMetrics = mock(QueueMetrics.class);
String reservationQ = testUtil.getFullReservationQueueName(); String reservationQ = testUtil.getFullReservationQueueName();
CapacitySchedulerConfiguration capConf = scheduler.getConfiguration(); Resource clusterResource = testUtil.calculateClusterResource(totCont);
capConf.setReservationWindow(reservationQ, timeWindow); ReservationSchedulerConfiguration conf =
capConf.setInstantaneousMaxCapacity(reservationQ, instConstraint); ReservationSystemTestUtil.createConf(reservationQ, timeWindow,
capConf.setAverageCapacity(reservationQ, avgConstraint); instConstraint, avgConstraint);
CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
policy.init(reservationQ, capConf); policy.init(reservationQ, conf);
plan = plan =
new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent, new InMemoryPlan(rootQueueMetrics, policy, mAgent,
scheduler.getClusterResource(), step, res, minAlloc, maxAlloc, clusterResource, step, res, minAlloc, maxAlloc,
"dedicated", null, true); "dedicated", null, true);
} }

View File

@ -47,15 +47,8 @@ public class TestCapacityReservationSystem {
Assert.fail(e.getMessage()); Assert.fail(e.getMessage());
} }
String planQName = testUtil.getreservationQueueName(); String planQName = testUtil.getreservationQueueName();
Plan plan = reservationSystem.getPlan(planQName); ReservationSystemTestUtil.validateReservationQueue(reservationSystem,
Assert.assertNotNull(plan); planQName);
Assert.assertTrue(plan instanceof InMemoryPlan);
Assert.assertEquals(planQName, plan.getQueueName());
Assert.assertEquals(8192, plan.getTotalCapacity().getMemory());
Assert
.assertTrue(plan.getReservationAgent() instanceof GreedyReservationAgent);
Assert
.assertTrue(plan.getSharingPolicy() instanceof CapacityOverTimePolicy);
} }
@Test @Test
@ -80,15 +73,7 @@ public class TestCapacityReservationSystem {
} }
// Assert queue in original config // Assert queue in original config
String planQName = testUtil.getreservationQueueName(); String planQName = testUtil.getreservationQueueName();
Plan plan = reservationSystem.getPlan(planQName); ReservationSystemTestUtil.validateReservationQueue(reservationSystem, planQName);
Assert.assertNotNull(plan);
Assert.assertTrue(plan instanceof InMemoryPlan);
Assert.assertEquals(planQName, plan.getQueueName());
Assert.assertEquals(8192, plan.getTotalCapacity().getMemory());
Assert
.assertTrue(plan.getReservationAgent() instanceof GreedyReservationAgent);
Assert
.assertTrue(plan.getSharingPolicy() instanceof CapacityOverTimePolicy);
// Dynamically add a plan // Dynamically add a plan
String newQ = "reservation"; String newQ = "reservation";
@ -104,16 +89,6 @@ public class TestCapacityReservationSystem {
} catch (YarnException e) { } catch (YarnException e) {
Assert.fail(e.getMessage()); Assert.fail(e.getMessage());
} }
Plan newPlan = reservationSystem.getPlan(newQ); ReservationSystemTestUtil.validateNewReservationQueue(reservationSystem, newQ);
Assert.assertNotNull(newPlan);
Assert.assertTrue(newPlan instanceof InMemoryPlan);
Assert.assertEquals(newQ, newPlan.getQueueName());
Assert.assertEquals(1024, newPlan.getTotalCapacity().getMemory());
Assert
.assertTrue(newPlan.getReservationAgent() instanceof GreedyReservationAgent);
Assert
.assertTrue(newPlan.getSharingPolicy() instanceof CapacityOverTimePolicy);
} }
} }

View File

@ -40,8 +40,6 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
@ -71,18 +69,19 @@ public class TestGreedyReservationAgent {
Resource clusterCapacity = Resource.newInstance(100 * 1024, 100); Resource clusterCapacity = Resource.newInstance(100 * 1024, 100);
step = 1000L; step = 1000L;
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
CapacityScheduler scheduler = testUtil.mockCapacityScheduler(125);
String reservationQ = testUtil.getFullReservationQueueName(); String reservationQ = testUtil.getFullReservationQueueName();
CapacitySchedulerConfiguration capConf = scheduler.getConfiguration();
capConf.setReservationWindow(reservationQ, timeWindow); float instConstraint = 100;
capConf.setMaximumCapacity(reservationQ, 100); float avgConstraint = 100;
capConf.setAverageCapacity(reservationQ, 100);
ReservationSchedulerConfiguration conf =
ReservationSystemTestUtil.createConf(reservationQ, timeWindow,
instConstraint, avgConstraint);
CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
policy.init(reservationQ, capConf); policy.init(reservationQ, conf);
agent = new GreedyReservationAgent(); agent = new GreedyReservationAgent();
QueueMetrics queueMetrics = QueueMetrics.forQueue("dedicated", QueueMetrics queueMetrics = mock(QueueMetrics.class);
mock(ParentQueue.class), false, capConf);
plan = new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step, plan = new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
res, minAlloc, maxAlloc, "dedicated", null, true); res, minAlloc, maxAlloc, "dedicated", null, true);
@ -549,12 +548,13 @@ public class TestGreedyReservationAgent {
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
CapacityScheduler scheduler = testUtil.mockCapacityScheduler(500 * 100); CapacityScheduler scheduler = testUtil.mockCapacityScheduler(500 * 100);
String reservationQ = testUtil.getFullReservationQueueName(); String reservationQ = testUtil.getFullReservationQueueName();
CapacitySchedulerConfiguration capConf = scheduler.getConfiguration(); float instConstraint = 100;
capConf.setReservationWindow(reservationQ, timeWindow); float avgConstraint = 100;
capConf.setMaximumCapacity(reservationQ, 100); ReservationSchedulerConfiguration conf =
capConf.setAverageCapacity(reservationQ, 100); ReservationSystemTestUtil.createConf(reservationQ, timeWindow,
instConstraint, avgConstraint);
CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
policy.init(reservationQ, capConf); policy.init(reservationQ, conf);
plan = new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, agent, plan = new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, agent,
clusterCapacity, step, res, minAlloc, maxAlloc, "dedicated", null, true); clusterCapacity, step, res, minAlloc, maxAlloc, "dedicated", null, true);

View File

@ -27,8 +27,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.Before; import org.junit.Before;
@ -60,15 +59,17 @@ public class TestNoOverCommitPolicy {
mAgent = mock(ReservationAgent.class); mAgent = mock(ReservationAgent.class);
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
CapacityScheduler scheduler = testUtil.mockCapacityScheduler(totCont);
String reservationQ = testUtil.getFullReservationQueueName(); String reservationQ = testUtil.getFullReservationQueueName();
CapacitySchedulerConfiguration capConf = scheduler.getConfiguration(); QueueMetrics rootQueueMetrics = mock(QueueMetrics.class);
Resource clusterResource = testUtil.calculateClusterResource(totCont);
ReservationSchedulerConfiguration conf = mock
(ReservationSchedulerConfiguration.class);
NoOverCommitPolicy policy = new NoOverCommitPolicy(); NoOverCommitPolicy policy = new NoOverCommitPolicy();
policy.init(reservationQ, capConf); policy.init(reservationQ, conf);
plan = plan =
new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent, new InMemoryPlan(rootQueueMetrics, policy, mAgent,
scheduler.getClusterResource(), step, res, minAlloc, maxAlloc, clusterResource, step, res, minAlloc, maxAlloc,
"dedicated", null, true); "dedicated", null, true);
} }

View File

@ -32,7 +32,6 @@ import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@ -60,13 +59,10 @@ public class TestSimpleCapacityReplanner {
when(clock.getTime()).thenReturn(0L); when(clock.getTime()).thenReturn(0L);
SimpleCapacityReplanner enf = new SimpleCapacityReplanner(clock); SimpleCapacityReplanner enf = new SimpleCapacityReplanner(clock);
CapacitySchedulerConfiguration conf = ReservationSchedulerConfiguration conf =
mock(CapacitySchedulerConfiguration.class); mock(ReservationSchedulerConfiguration.class);
when(conf.getEnforcementWindow(any(String.class))).thenReturn(6L); when(conf.getEnforcementWindow(any(String.class))).thenReturn(6L);
conf.setLong(CapacitySchedulerConfiguration.PREFIX + "blah"
+ CapacitySchedulerConfiguration.DOT
+ CapacitySchedulerConfiguration.RESERVATION_ENFORCEMENT_WINDOW, 6);
enf.init("blah", conf); enf.init("blah", conf);
// Initialize the plan with more resources // Initialize the plan with more resources