YARN-3739. Add reservation system recovery to RM recovery process. Contributed by Subru Krishnan.

(cherry picked from commit 2798723a54)
This commit is contained in:
Anubhav Dhoot 2015-10-22 06:36:58 -07:00
parent 3ed9db0a38
commit c44401f362
21 changed files with 556 additions and 185 deletions

View File

@ -183,6 +183,9 @@ Release 2.8.0 - UNRELEASED
YARN-4262. Allow whitelisted users to run privileged docker containers.
(Sidharta Seethana via vvasudev)
YARN-3739. Add reservation system recovery to RM recovery process.
(Subru Krishnan via adhoot)
IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before

View File

@ -1362,7 +1362,7 @@ public class ClientRMService extends AbstractService implements
.format(
"Reservation {0} is within threshold so attempting to create synchronously.",
reservationId));
reservationSystem.synchronizePlan(planName);
reservationSystem.synchronizePlan(planName, true);
LOG.info(MessageFormat.format("Created reservation {0} synchronously.",
reservationId));
}

View File

@ -1186,6 +1186,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
// recover AMRMTokenSecretManager
rmContext.getAMRMTokenSecretManager().recover(state);
// recover reservations
if (reservationSystem != null) {
reservationSystem.recover(state);
}
// recover applications
rmAppManager.recover(state);

View File

@ -18,16 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
@ -38,8 +28,11 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@ -52,6 +45,17 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* This is the implementation of {@link ReservationSystem} based on the
* {@link ResourceScheduler}
@ -94,6 +98,8 @@ public abstract class AbstractReservationSystem extends AbstractService
private PlanFollower planFollower;
private boolean isRecoveryEnabled = false;
/**
* Construct the service.
*
@ -149,6 +155,49 @@ public abstract class AbstractReservationSystem extends AbstractService
Plan plan = initializePlan(planQueueName);
plans.put(planQueueName, plan);
}
isRecoveryEnabled = conf.getBoolean(
YarnConfiguration.RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
}
private void loadPlan(String planName,
Map<ReservationId, ReservationAllocationStateProto> reservations)
throws PlanningException {
Plan plan = plans.get(planName);
Resource minAllocation = getMinAllocation();
ResourceCalculator rescCalculator = getResourceCalculator();
for (Entry<ReservationId, ReservationAllocationStateProto> currentReservation : reservations
.entrySet()) {
plan.addReservation(ReservationSystemUtil.toInMemoryAllocation(planName,
currentReservation.getKey(), currentReservation.getValue(),
minAllocation, rescCalculator), true);
resQMap.put(currentReservation.getKey(), planName);
}
LOG.info("Recovered reservations for Plan: {}", planName);
}
@Override
public void recover(RMState state) throws Exception {
LOG.info("Recovering Reservation system");
writeLock.lock();
try {
Map<String, Map<ReservationId, ReservationAllocationStateProto>> reservationSystemState =
state.getReservationState();
if (planFollower != null) {
for (String plan : plans.keySet()) {
// recover reservations if any from state store
if (reservationSystemState.containsKey(plan)) {
loadPlan(plan, reservationSystemState.get(plan));
}
synchronizePlan(plan, false);
}
startPlanFollower(conf.getLong(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS));
}
} finally {
writeLock.unlock();
}
}
private void initializeNewPlans(Configuration conf) {
@ -162,7 +211,7 @@ public abstract class AbstractReservationSystem extends AbstractService
Plan plan = initializePlan(planQueueName);
plans.put(planQueueName, plan);
} else {
LOG.warn("Plan based on reservation queue {0} already exists.",
LOG.warn("Plan based on reservation queue {} already exists.",
planQueueName);
}
}
@ -236,18 +285,26 @@ public abstract class AbstractReservationSystem extends AbstractService
}
@Override
public void synchronizePlan(String planName) {
public void synchronizePlan(String planName, boolean shouldReplan) {
writeLock.lock();
try {
Plan plan = plans.get(planName);
if (plan != null) {
planFollower.synchronizePlan(plan);
planFollower.synchronizePlan(plan, shouldReplan);
}
} finally {
writeLock.unlock();
}
}
private void startPlanFollower(long initialDelay) {
if (planFollower != null) {
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
scheduledExecutorService.scheduleWithFixedDelay(planFollower,
initialDelay, planStepSize, TimeUnit.MILLISECONDS);
}
}
@Override
public void serviceInit(Configuration conf) throws Exception {
Configuration configuration = new Configuration(conf);
@ -262,10 +319,8 @@ public abstract class AbstractReservationSystem extends AbstractService
@Override
public void serviceStart() throws Exception {
if (planFollower != null) {
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
scheduledExecutorService.scheduleWithFixedDelay(planFollower, 0L,
planStepSize, TimeUnit.MILLISECONDS);
if (!isRecoveryEnabled) {
startPlanFollower(planStepSize);
}
super.serviceStart();
}
@ -350,7 +405,7 @@ public abstract class AbstractReservationSystem extends AbstractService
minAllocation, maxAllocation, planQueueName,
getReplanner(planQueuePath), getReservationSchedulerConfiguration()
.getMoveOnExpiry(planQueuePath), rmContext);
LOG.info("Intialized plan {0} based on reservable queue {1}",
LOG.info("Intialized plan {} based on reservable queue {}",
plan.toString(), planQueueName);
return plan;
}

View File

@ -27,8 +27,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -43,7 +43,7 @@ import java.util.Set;
public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
private static final Logger LOG = LoggerFactory
.getLogger(CapacitySchedulerPlanFollower.class);
.getLogger(AbstractSchedulerPlanFollower.class);
protected Collection<Plan> plans = new ArrayList<Plan>();
protected YarnScheduler scheduler;
@ -59,7 +59,7 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
@Override
public synchronized void run() {
for (Plan plan : plans) {
synchronizePlan(plan);
synchronizePlan(plan, true);
}
}
@ -70,7 +70,7 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
}
@Override
public synchronized void synchronizePlan(Plan plan) {
public synchronized void synchronizePlan(Plan plan, boolean shouldReplan) {
String planQueueName = plan.getQueueName();
if (LOG.isDebugEnabled()) {
LOG.debug("Running plan follower edit policy for plan: " + planQueueName);
@ -88,14 +88,12 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
Resource clusterResources = scheduler.getClusterResource();
Resource planResources = getPlanResources(plan, planQueue,
clusterResources);
Set<ReservationAllocation> currentReservations =
plan.getReservationsAtTime(now);
Set<String> curReservationNames = new HashSet<String>();
Resource reservedResources = Resource.newInstance(0, 0);
int numRes = getReservedResources(now, currentReservations,
curReservationNames, reservedResources);
// create the default reservation queue if it doesnt exist
String defReservationId = getReservationIdFromQueueName(planQueueName) +
ReservationConstants.DEFAULT_QUEUE_SUFFIX;
@ -104,14 +102,18 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
createDefaultReservationQueue(planQueueName, planQueue,
defReservationId);
curReservationNames.add(defReservationId);
// if the resources dedicated to this plan has shrunk invoke replanner
if (arePlanResourcesLessThanReservations(clusterResources, planResources,
reservedResources)) {
try {
plan.getReplanner().plan(plan, null);
} catch (PlanningException e) {
LOG.warn("Exception while trying to replan: {}", planQueueName, e);
boolean shouldResize = false;
if (arePlanResourcesLessThanReservations(plan.getResourceCalculator(),
clusterResources, planResources, reservedResources)) {
if (shouldReplan) {
try {
plan.getReplanner().plan(plan, null);
} catch (PlanningException e) {
LOG.warn("Exception while trying to replan: {}", planQueueName, e);
}
} else {
shouldResize = true;
}
}
// identify the reservations that have expired and new reservations that
@ -133,7 +135,6 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
// garbage collect expired reservations
cleanupExpiredQueues(planQueueName, plan.getMoveOnExpiry(), expired,
defReservationQueue);
// Add new reservations and update existing ones
float totalAssignedCapacity = 0f;
if (currentReservations != null) {
@ -146,9 +147,8 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
planQueueName, e);
}
// sort allocations from the one giving up the most resources, to the
// one asking for the most
// avoid order-of-operation errors that temporarily violate 100%
// capacity bound
// one asking for the most avoid order-of-operation errors that
// temporarily violate 100% capacity bound
List<ReservationAllocation> sortedAllocations =
sortByDelta(
new ArrayList<ReservationAllocation>(currentReservations), now,
@ -162,10 +162,15 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
float targetCapacity = 0f;
if (planResources.getMemory() > 0
&& planResources.getVirtualCores() > 0) {
if (shouldResize) {
capToAssign =
calculateReservationToPlanProportion(
plan.getResourceCalculator(), planResources,
reservedResources, capToAssign);
}
targetCapacity =
calculateReservationToPlanRatio(clusterResources,
planResources,
capToAssign);
calculateReservationToPlanRatio(plan.getResourceCalculator(),
clusterResources, planResources, capToAssign);
}
if (LOG.isDebugEnabled()) {
LOG.debug(
@ -211,7 +216,6 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
}
LOG.info("Finished iteration of plan follower edit policy for plan: "
+ planQueueName);
// Extension: update plan with app states,
// useful to support smart replanning
}
@ -323,19 +327,35 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
*/
protected abstract Queue getPlanQueue(String planQueueName);
/**
* Resizes reservations based on currently available resources
*/
private Resource calculateReservationToPlanProportion(
ResourceCalculator rescCalculator, Resource availablePlanResources,
Resource totalReservationResources, Resource reservationResources) {
return Resources.multiply(availablePlanResources, Resources.ratio(
rescCalculator, reservationResources, totalReservationResources));
}
/**
* Calculates ratio of reservationResources to planResources
*/
protected abstract float calculateReservationToPlanRatio(
Resource clusterResources, Resource planResources,
Resource reservationResources);
private float calculateReservationToPlanRatio(
ResourceCalculator rescCalculator, Resource clusterResources,
Resource planResources, Resource reservationResources) {
return Resources.divide(rescCalculator, clusterResources,
reservationResources, planResources);
}
/**
* Check if plan resources are less than expected reservation resources
*/
protected abstract boolean arePlanResourcesLessThanReservations(
Resource clusterResources, Resource planResources,
Resource reservedResources);
private boolean arePlanResourcesLessThanReservations(
ResourceCalculator rescCalculator, Resource clusterResources,
Resource planResources, Resource reservedResources) {
return Resources.greaterThan(rescCalculator, clusterResources,
reservedResources, planResources);
}
/**
* Get a list of reservation queues for this planQueue
@ -363,7 +383,7 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
Plan plan, Queue queue, Resource clusterResources);
/**
* Get reservation queue resources if it exists otherwise return null
* Get reservation queue resources if it exists otherwise return null.
*/
protected abstract Resource getReservationQueueResourceIfExists(Plan plan,
ReservationId reservationId);

View File

@ -80,22 +80,6 @@ public class CapacitySchedulerPlanFollower extends AbstractSchedulerPlanFollower
return queue;
}
@Override
protected float calculateReservationToPlanRatio(
Resource clusterResources, Resource planResources,
Resource reservationResources) {
return Resources.divide(cs.getResourceCalculator(),
clusterResources, reservationResources, planResources);
}
@Override
protected boolean arePlanResourcesLessThanReservations(
Resource clusterResources, Resource planResources,
Resource reservedResources) {
return Resources.greaterThan(cs.getResourceCalculator(),
clusterResources, reservedResources, planResources);
}
@Override
protected List<? extends Queue> getChildReservationQueues(Queue queue) {
PlanQueue planQueue = (PlanQueue)queue;

View File

@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueu
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -59,20 +58,6 @@ public class FairSchedulerPlanFollower extends AbstractSchedulerPlanFollower {
return planQueue;
}
@Override
protected float calculateReservationToPlanRatio(Resource clusterResources,
Resource planResources, Resource capToAssign) {
return Resources.divide(fs.getResourceCalculator(),
clusterResources, capToAssign, planResources);
}
@Override
protected boolean arePlanResourcesLessThanReservations(Resource
clusterResources, Resource planResources, Resource reservedResources) {
return Resources.greaterThan(fs.getResourceCalculator(),
clusterResources, reservedResources, planResources);
}
@Override
protected List<? extends Queue> getChildReservationQueues(Queue queue) {
FSQueue planQueue = (FSQueue)queue;

View File

@ -33,6 +33,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
@ -54,7 +55,7 @@ public class InMemoryPlan implements Plan {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryPlan.class);
private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
private final RMContext rmContext;
private final RMStateStore rmStateStore;
private TreeMap<ReservationInterval, Set<InMemoryReservationAllocation>> currentReservations =
new TreeMap<ReservationInterval, Set<InMemoryReservationAllocation>>();
@ -112,7 +113,7 @@ public class InMemoryPlan implements Plan {
this.replanner = replanner;
this.getMoveOnExpiry = getMoveOnExpiry;
this.clock = clock;
this.rmContext = rmContext;
this.rmStateStore = rmContext.getStateStore();
}
@Override
@ -174,8 +175,8 @@ public class InMemoryPlan implements Plan {
}
@Override
public boolean addReservation(ReservationAllocation reservation)
throws PlanningException {
public boolean addReservation(ReservationAllocation reservation,
boolean isRecovering) throws PlanningException {
// Verify the allocation is memory based otherwise it is not supported
InMemoryReservationAllocation inMemReservation =
(InMemoryReservationAllocation) reservation;
@ -198,9 +199,16 @@ public class InMemoryPlan implements Plan {
}
// Validate if we can accept this reservation, throws exception if
// validation fails
policy.validate(this, inMemReservation);
// we record here the time in which the allocation has been accepted
reservation.setAcceptanceTimestamp(clock.getTime());
if (!isRecovering) {
policy.validate(this, inMemReservation);
// we record here the time in which the allocation has been accepted
reservation.setAcceptanceTimestamp(clock.getTime());
if (rmStateStore != null) {
rmStateStore.storeNewReservation(
ReservationSystemUtil.buildStateProto(inMemReservation),
getQueueName(), inMemReservation.getReservationId().toString());
}
}
ReservationInterval searchInterval =
new ReservationInterval(inMemReservation.getStartTime(),
inMemReservation.getEndTime());
@ -217,9 +225,6 @@ public class InMemoryPlan implements Plan {
currentReservations.put(searchInterval, reservations);
reservationTable.put(inMemReservation.getReservationId(),
inMemReservation);
rmContext.getStateStore().storeNewReservation(
ReservationSystemUtil.buildStateProto(inMemReservation),
getQueueName(), inMemReservation.getReservationId().toString());
incrementAllocation(inMemReservation);
LOG.info("Sucessfully added reservation: {} to plan.",
inMemReservation.getReservationId());
@ -253,7 +258,7 @@ public class InMemoryPlan implements Plan {
return result;
}
try {
result = addReservation(reservation);
result = addReservation(reservation, false);
} catch (PlanningException e) {
LOG.error("Unable to update reservation: {} from plan due to {}.",
reservation.getReservationId(), e.getMessage());
@ -264,7 +269,7 @@ public class InMemoryPlan implements Plan {
return result;
} else {
// rollback delete
addReservation(currReservation);
addReservation(currReservation, false);
LOG.info("Rollbacked update reservation: {} from plan.",
reservation.getReservationId());
return result;
@ -282,6 +287,10 @@ public class InMemoryPlan implements Plan {
Set<InMemoryReservationAllocation> reservations =
currentReservations.get(searchInterval);
if (reservations != null) {
if (rmStateStore != null) {
rmStateStore.removeReservation(getQueueName(),
reservation.getReservationId().toString());
}
if (!reservations.remove(reservation)) {
LOG.error("Unable to remove reservation: {} from plan.",
reservation.getReservationId());
@ -298,8 +307,6 @@ public class InMemoryPlan implements Plan {
throw new IllegalArgumentException(errMsg);
}
reservationTable.remove(reservation.getReservationId());
rmContext.getStateStore().removeReservation(
getQueueName(), reservation.getReservationId().toString());
decrementAllocation(reservation);
LOG.info("Sucessfully deleted reservation: {} in plan.",
reservation.getReservationId());

View File

@ -32,10 +32,12 @@ public interface PlanEdit extends PlanContext, PlanView {
*
* @param reservation the {@link ReservationAllocation} to be added to the
* plan
* @param isRecovering flag to indicate if reservation is being added as part
* of failover or not
* @return true if addition is successful, false otherwise
*/
public boolean addReservation(ReservationAllocation reservation)
throws PlanningException;
public boolean addReservation(ReservationAllocation reservation,
boolean isRecovering) throws PlanningException;
/**
* Updates an existing {@link ReservationAllocation} in the plan. This is

View File

@ -71,8 +71,10 @@ public interface PlanFollower extends Runnable {
* start time is imminent.
*
* @param plan the Plan to synchronize
* @param shouldReplan replan on reduction of plan capacity if true or
* proportionally scale down reservations if false
*/
public void synchronizePlan(Plan plan);
public void synchronizePlan(Plan plan, boolean shouldReplan);
/**
* Setter for the list of plans.

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@ -40,7 +41,7 @@ import java.util.Map;
*/
@LimitedPrivate("yarn")
@Unstable
public interface ReservationSystem {
public interface ReservationSystem extends Recoverable {
/**
* Set RMContext for {@link ReservationSystem}. This method should be called
@ -82,8 +83,10 @@ public interface ReservationSystem {
* the {@link ResourceScheduler}
*
* @param planName the name of the {@link Plan} to be synchronized
* @param shouldReplan replan on reduction of plan capacity if true or
* proportionally scale down reservations if false
*/
void synchronizePlan(String planName);
void synchronizePlan(String planName, boolean shouldReplan);
/**
* Return the time step (ms) at which the {@link PlanFollower} is invoked

View File

@ -94,7 +94,7 @@ public abstract class PlanningAlgorithm implements ReservationAgent {
if (oldReservation != null) {
return plan.updateReservation(capReservation);
} else {
return plan.addReservation(capReservation);
return plan.addReservation(capReservation, false);
}
}

View File

@ -18,15 +18,20 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@ -38,7 +43,7 @@ import org.junit.Test;
import java.util.Map;
public class TestReservationSystemWithRMHA extends RMHATestBase{
public class TestReservationSystemWithRMHA extends RMHATestBase {
@Override
public void setup() throws Exception {
@ -56,7 +61,7 @@ public class TestReservationSystemWithRMHA extends RMHATestBase{
public void testSubmitReservationAndCheckAfterFailover() throws Exception {
startRMs();
addNodeCapacityToPlan();
addNodeCapacityToPlan(rm1, 102400, 100);
ClientRMService clientService = rm1.getClientRMService();
@ -72,8 +77,6 @@ public class TestReservationSystemWithRMHA extends RMHATestBase{
ReservationId reservationID = response.getReservationId();
Assert.assertNotNull(reservationID);
LOG.info("Submit reservation response: " + reservationID);
ReservationDefinition reservationDefinition = request
.getReservationDefinition();
// Do the failover
explicitFailover();
@ -87,12 +90,11 @@ public class TestReservationSystemWithRMHA extends RMHATestBase{
Assert.assertNotNull(reservationStateMap.get(reservationID));
}
@Test
public void testUpdateReservationAndCheckAfterFailover() throws Exception {
startRMs();
addNodeCapacityToPlan();
addNodeCapacityToPlan(rm1, 102400, 100);
ClientRMService clientService = rm1.getClientRMService();
@ -108,17 +110,15 @@ public class TestReservationSystemWithRMHA extends RMHATestBase{
ReservationId reservationID = response.getReservationId();
Assert.assertNotNull(reservationID);
LOG.info("Submit reservation response: " + reservationID);
ReservationDefinition reservationDefinition = request
.getReservationDefinition();
ReservationDefinition reservationDefinition =
request.getReservationDefinition();
// Change any field
long newDeadline = reservationDefinition.getDeadline() + 100;
reservationDefinition.setDeadline(newDeadline);
ReservationUpdateRequest updateRequest =
ReservationUpdateRequest.newInstance(
reservationDefinition, reservationID);
ReservationUpdateRequest updateRequest = ReservationUpdateRequest
.newInstance(reservationDefinition, reservationID);
rm1.updateReservationState(updateRequest);
// Do the failover
@ -140,7 +140,7 @@ public class TestReservationSystemWithRMHA extends RMHATestBase{
public void testDeleteReservationAndCheckAfterFailover() throws Exception {
startRMs();
addNodeCapacityToPlan();
addNodeCapacityToPlan(rm1, 102400, 100);
ClientRMService clientService = rm1.getClientRMService();
@ -156,7 +156,6 @@ public class TestReservationSystemWithRMHA extends RMHATestBase{
ReservationId reservationID = response.getReservationId();
Assert.assertNotNull(reservationID);
// Delete the reservation
ReservationDeleteRequest deleteRequest =
ReservationDeleteRequest.newInstance(reservationID);
@ -168,32 +167,31 @@ public class TestReservationSystemWithRMHA extends RMHATestBase{
rm2.registerNode("127.0.0.1:1", 102400, 100);
RMState state = rm2.getRMContext().getStateStore().loadState();
Assert.assertNull(state.getReservationState().get(
ReservationSystemTestUtil.reservationQ));
Assert.assertNull(state.getReservationState()
.get(ReservationSystemTestUtil.reservationQ));
}
private void addNodeCapacityToPlan() {
private void addNodeCapacityToPlan(MockRM rm, int memory, int vCores) {
try {
rm1.registerNode("127.0.0.1:1", 102400, 100);
rm.registerNode("127.0.0.1:1", memory, vCores);
int attempts = 10;
do {
DrainDispatcher dispatcher =
(DrainDispatcher) rm1.getRMContext().getDispatcher();
dispatcher.await();
rm1.getRMContext().getReservationSystem().synchronizePlan(
ReservationSystemTestUtil.reservationQ);
if (rm1.getRMContext().getReservationSystem().getPlan
(ReservationSystemTestUtil.reservationQ).getTotalCapacity()
rm.getRMContext().getReservationSystem()
.synchronizePlan(ReservationSystemTestUtil.reservationQ, false);
if (rm.getRMContext().getReservationSystem()
.getPlan(ReservationSystemTestUtil.reservationQ).getTotalCapacity()
.getMemory() > 0) {
break;
}
LOG.info("Waiting for node capacity to be added to plan");
Thread.sleep(100);
}
while (attempts-- > 0);
} while (attempts-- > 0);
if (attempts <= 0) {
Assert.fail("Exhausted attempts in checking if node capacity was " +
"added to the plan");
Assert.fail("Exhausted attempts in checking if node capacity was "
+ "added to the plan");
}
} catch (Exception e) {
@ -205,8 +203,316 @@ public class TestReservationSystemWithRMHA extends RMHATestBase{
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
long deadline = (long) (arrival + duration + 1500);
return ReservationSystemTestUtil.createSimpleReservationRequest(4, arrival,
deadline, duration);
}
private void validateReservation(Plan plan, ReservationId resId,
ReservationDefinition rDef) {
ReservationAllocation reservation = plan.getReservationById(resId);
Assert.assertNotNull(reservation);
Assert.assertEquals(rDef.getDeadline(),
reservation.getReservationDefinition().getDeadline());
}
@Test
public void testSubmitReservationFailoverAndDelete() throws Exception {
startRMs();
addNodeCapacityToPlan(rm1, 102400, 100);
ClientRMService clientService = rm1.getClientRMService();
// create a reservation
ReservationSubmissionRequest request = createReservationSubmissionRequest();
ReservationSubmissionResponse response = null;
try {
response = clientService.submitReservation(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
ReservationId reservationID = response.getReservationId();
Assert.assertNotNull(reservationID);
LOG.info("Submit reservation response: " + reservationID);
ReservationDefinition reservationDefinition =
request.getReservationDefinition();
// Do the failover
explicitFailover();
addNodeCapacityToPlan(rm2, 102400, 100);
// check if reservation exists after failover
Plan plan = rm2.getRMContext().getReservationSystem()
.getPlan(ReservationSystemTestUtil.reservationQ);
validateReservation(plan, reservationID, reservationDefinition);
// delete the reservation
ReservationDeleteRequest deleteRequest =
ReservationDeleteRequest.newInstance(reservationID);
ReservationDeleteResponse deleteResponse = null;
clientService = rm2.getClientRMService();
try {
deleteResponse = clientService.deleteReservation(deleteRequest);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(deleteResponse);
Assert.assertNull(plan.getReservationById(reservationID));
}
@Test
public void testFailoverAndSubmitReservation() throws Exception {
startRMs();
addNodeCapacityToPlan(rm1, 102400, 100);
// Do the failover
explicitFailover();
addNodeCapacityToPlan(rm2, 102400, 100);
// create a reservation
ClientRMService clientService = rm2.getClientRMService();
ReservationSubmissionRequest request = createReservationSubmissionRequest();
ReservationSubmissionResponse response = null;
try {
response = clientService.submitReservation(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
ReservationId reservationID = response.getReservationId();
Assert.assertNotNull(reservationID);
LOG.info("Submit reservation response: " + reservationID);
ReservationDefinition reservationDefinition =
request.getReservationDefinition();
// check if reservation is submitted successfully
Plan plan = rm2.getRMContext().getReservationSystem()
.getPlan(ReservationSystemTestUtil.reservationQ);
validateReservation(plan, reservationID, reservationDefinition);
}
@Test
public void testSubmitReservationFailoverAndUpdate() throws Exception {
startRMs();
addNodeCapacityToPlan(rm1, 102400, 100);
ClientRMService clientService = rm1.getClientRMService();
// create a reservation
ReservationSubmissionRequest request = createReservationSubmissionRequest();
ReservationSubmissionResponse response = null;
try {
response = clientService.submitReservation(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
ReservationId reservationID = response.getReservationId();
Assert.assertNotNull(reservationID);
LOG.info("Submit reservation response: " + reservationID);
ReservationDefinition reservationDefinition =
request.getReservationDefinition();
// Do the failover
explicitFailover();
addNodeCapacityToPlan(rm2, 102400, 100);
// check if reservation exists after failover
Plan plan = rm2.getRMContext().getReservationSystem()
.getPlan(ReservationSystemTestUtil.reservationQ);
validateReservation(plan, reservationID, reservationDefinition);
// update the reservation
long newDeadline = reservationDefinition.getDeadline() + 100;
reservationDefinition.setDeadline(newDeadline);
ReservationUpdateRequest updateRequest = ReservationUpdateRequest
.newInstance(reservationDefinition, reservationID);
ReservationUpdateResponse updateResponse = null;
clientService = rm2.getClientRMService();
try {
updateResponse = clientService.updateReservation(updateRequest);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(updateResponse);
validateReservation(plan, reservationID, reservationDefinition);
}
@Test
public void testSubmitUpdateReservationFailoverAndDelete() throws Exception {
startRMs();
addNodeCapacityToPlan(rm1, 102400, 100);
ClientRMService clientService = rm1.getClientRMService();
// create a reservation
ReservationSubmissionRequest request = createReservationSubmissionRequest();
ReservationSubmissionResponse response = null;
try {
response = clientService.submitReservation(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
ReservationId reservationID = response.getReservationId();
Assert.assertNotNull(reservationID);
LOG.info("Submit reservation response: " + reservationID);
ReservationDefinition reservationDefinition =
request.getReservationDefinition();
// check if reservation is submitted successfully
Plan plan = rm1.getRMContext().getReservationSystem()
.getPlan(ReservationSystemTestUtil.reservationQ);
validateReservation(plan, reservationID, reservationDefinition);
// update the reservation
long newDeadline = reservationDefinition.getDeadline() + 100;
reservationDefinition.setDeadline(newDeadline);
ReservationUpdateRequest updateRequest = ReservationUpdateRequest
.newInstance(reservationDefinition, reservationID);
ReservationUpdateResponse updateResponse = null;
try {
updateResponse = clientService.updateReservation(updateRequest);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(updateResponse);
validateReservation(plan, reservationID, reservationDefinition);
// Do the failover
explicitFailover();
addNodeCapacityToPlan(rm2, 102400, 100);
// check if reservation exists after failover
plan = rm2.getRMContext().getReservationSystem()
.getPlan(ReservationSystemTestUtil.reservationQ);
validateReservation(plan, reservationID, reservationDefinition);
// delete the reservation
ReservationDeleteRequest deleteRequest =
ReservationDeleteRequest.newInstance(reservationID);
ReservationDeleteResponse deleteResponse = null;
clientService = rm2.getClientRMService();
try {
deleteResponse = clientService.deleteReservation(deleteRequest);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(deleteResponse);
Assert.assertNull(plan.getReservationById(reservationID));
}
@Test
public void testReservationResizeAfterFailover() throws Exception {
startRMs();
addNodeCapacityToPlan(rm1, 102400, 100);
ClientRMService clientService = rm1.getClientRMService();
// create 3 reservations
ReservationSubmissionRequest request = createReservationSubmissionRequest();
ReservationDefinition reservationDefinition =
request.getReservationDefinition();
ReservationSubmissionResponse response = null;
try {
response = clientService.submitReservation(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
ReservationId resID1 = response.getReservationId();
Assert.assertNotNull(resID1);
LOG.info("Submit reservation response: " + resID1);
try {
response = clientService.submitReservation(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
ReservationId resID2 = response.getReservationId();
Assert.assertNotNull(resID2);
LOG.info("Submit reservation response: " + resID2);
try {
response = clientService.submitReservation(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
ReservationId resID3 = response.getReservationId();
Assert.assertNotNull(resID3);
LOG.info("Submit reservation response: " + resID3);
// allow the reservations to become active
waitForReservationActivation(rm1, resID1,
ReservationSystemTestUtil.reservationQ);
// validate reservations before failover
Plan plan = rm1.getRMContext().getReservationSystem()
.getPlan(ReservationSystemTestUtil.reservationQ);
validateReservation(plan, resID1, reservationDefinition);
validateReservation(plan, resID2, reservationDefinition);
validateReservation(plan, resID3, reservationDefinition);
ResourceScheduler scheduler = rm1.getResourceScheduler();
QueueInfo resQ1 = scheduler.getQueueInfo(resID1.toString(), false, false);
Assert.assertEquals(0.05, resQ1.getCapacity(), 0.001f);
QueueInfo resQ2 = scheduler.getQueueInfo(resID2.toString(), false, false);
Assert.assertEquals(0.05, resQ2.getCapacity(), 0.001f);
QueueInfo resQ3 = scheduler.getQueueInfo(resID3.toString(), false, false);
Assert.assertEquals(0.05, resQ3.getCapacity(), 0.001f);
// Do the failover
explicitFailover();
addNodeCapacityToPlan(rm2, 5120, 5);
// check if reservations exists after failover
plan = rm2.getRMContext().getReservationSystem()
.getPlan(ReservationSystemTestUtil.reservationQ);
validateReservation(plan, resID1, reservationDefinition);
validateReservation(plan, resID3, reservationDefinition);
// verify if the reservations have been resized
scheduler = rm2.getResourceScheduler();
resQ1 = scheduler.getQueueInfo(resID1.toString(), false, false);
Assert.assertEquals(1f / 3f, resQ1.getCapacity(), 0.001f);
resQ2 = scheduler.getQueueInfo(resID2.toString(), false, false);
Assert.assertEquals(1f / 3f, resQ2.getCapacity(), 0.001f);
resQ3 = scheduler.getQueueInfo(resID3.toString(), false, false);
Assert.assertEquals(1f / 3f, resQ3.getCapacity(), 0.001f);
}
private void waitForReservationActivation(MockRM rm,
ReservationId reservationId, String planName) {
try {
int attempts = 20;
do {
rm.getRMContext().getReservationSystem().synchronizePlan(planName,
false);
if (rm.getResourceScheduler()
.getQueueInfo(reservationId.toString(), false, false)
.getCapacity() > 0f) {
break;
}
LOG.info("Waiting for reservation to be active");
Thread.sleep(100);
} while (attempts-- > 0);
if (attempts <= 0) {
Assert
.fail("Exceeded attempts in waiting for reservation to be active");
}
} catch (Exception e) {
Assert.fail(e.getMessage());
}
}
}

View File

@ -215,7 +215,6 @@ public class ReservationSystemTestUtil {
return context;
}
@SuppressWarnings("unchecked")
public CapacityScheduler mockCapacityScheduler(int numContainers)
throws IOException {
// stolen from TestCapacityScheduler

View File

@ -75,10 +75,11 @@ public class TestCapacityOverTimePolicy {
maxAlloc = Resource.newInstance(1024 * 8, 8);
mAgent = mock(ReservationAgent.class);
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
QueueMetrics rootQueueMetrics = mock(QueueMetrics.class);
String reservationQ = testUtil.getFullReservationQueueName();
Resource clusterResource = testUtil.calculateClusterResource(totCont);
String reservationQ =
ReservationSystemTestUtil.getFullReservationQueueName();
Resource clusterResource =
ReservationSystemTestUtil.calculateClusterResource(totCont);
ReservationSchedulerConfiguration conf =
ReservationSystemTestUtil.createConf(reservationQ, timeWindow,
instConstraint, avgConstraint);
@ -113,7 +114,7 @@ public class TestCapacityOverTimePolicy {
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
res, minAlloc), false));
}
@Test
@ -130,7 +131,7 @@ public class TestCapacityOverTimePolicy {
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
res, minAlloc), false));
}
@Test
@ -146,7 +147,7 @@ public class TestCapacityOverTimePolicy {
ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
res, minAlloc), false));
}
}
@ -163,7 +164,7 @@ public class TestCapacityOverTimePolicy {
ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
res, minAlloc), false));
}
}
@ -179,7 +180,7 @@ public class TestCapacityOverTimePolicy {
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
res, minAlloc), false));
Assert.fail("should not have accepted this");
}
@ -195,20 +196,20 @@ public class TestCapacityOverTimePolicy {
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
res, minAlloc), false));
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
res, minAlloc), false));
try {
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
res, minAlloc), false));
Assert.fail();
} catch (PlanningQuotaException p) {
// expected
@ -232,7 +233,7 @@ public class TestCapacityOverTimePolicy {
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
"dedicated", initTime, initTime + win, req, res, minAlloc)));
"dedicated", initTime, initTime + win, req, res, minAlloc), false));
}
@Test
@ -251,13 +252,13 @@ public class TestCapacityOverTimePolicy {
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
"dedicated", initTime, initTime + win, req, res, minAlloc)));
"dedicated", initTime, initTime + win, req, res, minAlloc), false));
try {
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", initTime, initTime + win, req, res, minAlloc)));
"dedicated", initTime, initTime + win, req, res, minAlloc), false));
Assert.fail("should not have accepted this");
} catch (PlanningQuotaException e) {

View File

@ -113,7 +113,7 @@ public class TestInMemoryPlan {
start, start + alloc.length, allocs, resCalc, minAlloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation);
plan.addReservation(rAllocation, false);
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
@ -147,7 +147,7 @@ public class TestInMemoryPlan {
start, start + alloc.length, allocs, resCalc, minAlloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation);
plan.addReservation(rAllocation, false);
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
@ -175,7 +175,7 @@ public class TestInMemoryPlan {
start, start + alloc.length, allocs, resCalc, minAlloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation);
plan.addReservation(rAllocation, false);
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
@ -189,7 +189,7 @@ public class TestInMemoryPlan {
// Try to add it again
try {
plan.addReservation(rAllocation);
plan.addReservation(rAllocation, false);
Assert.fail("Add should fail as it already exists");
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().endsWith("already exists"));
@ -221,7 +221,7 @@ public class TestInMemoryPlan {
start, start + alloc.length, allocs, resCalc, minAlloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation);
plan.addReservation(rAllocation, false);
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
@ -316,7 +316,7 @@ public class TestInMemoryPlan {
start, start + alloc.length, allocs, resCalc, minAlloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation);
plan.addReservation(rAllocation, false);
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
@ -388,7 +388,7 @@ public class TestInMemoryPlan {
minAlloc);
Assert.assertNull(plan.getReservationById(reservationID1));
try {
plan.addReservation(rAllocation);
plan.addReservation(rAllocation, false);
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
@ -419,7 +419,7 @@ public class TestInMemoryPlan {
minAlloc);
Assert.assertNull(plan.getReservationById(reservationID2));
try {
plan.addReservation(rAllocation);
plan.addReservation(rAllocation, false);
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}

View File

@ -61,10 +61,11 @@ public class TestNoOverCommitPolicy {
maxAlloc = Resource.newInstance(1024 * 8, 8);
mAgent = mock(ReservationAgent.class);
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
String reservationQ = testUtil.getFullReservationQueueName();
String reservationQ =
ReservationSystemTestUtil.getFullReservationQueueName();
QueueMetrics rootQueueMetrics = mock(QueueMetrics.class);
Resource clusterResource = testUtil.calculateClusterResource(totCont);
Resource clusterResource =
ReservationSystemTestUtil.calculateClusterResource(totCont);
ReservationSchedulerConfiguration conf = mock
(ReservationSchedulerConfiguration.class);
NoOverCommitPolicy policy = new NoOverCommitPolicy();
@ -97,7 +98,7 @@ public class TestNoOverCommitPolicy {
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
res, minAlloc), false));
}
@Test
@ -113,7 +114,7 @@ public class TestNoOverCommitPolicy {
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
res, minAlloc), false));
}
@Test(expected = ResourceOverCommitException.class)
@ -123,7 +124,7 @@ public class TestNoOverCommitPolicy {
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", initTime, initTime + f.length, ReservationSystemTestUtil
.generateAllocation(initTime, step, f), res, minAlloc));
.generateAllocation(initTime, step, f), res, minAlloc), false);
}
@Test(expected = MismatchedUserException.class)
@ -137,7 +138,7 @@ public class TestNoOverCommitPolicy {
plan.addReservation(new InMemoryReservationAllocation(rid, rDef, "u1",
"dedicated", initTime, initTime + f.length, ReservationSystemTestUtil
.generateAllocation(initTime, step, f), res, minAlloc));
.generateAllocation(initTime, step, f), res, minAlloc), false);
// trying to update a reservation with a mismatching user
plan.updateReservation(new InMemoryReservationAllocation(rid, rDef, "u2",
@ -158,7 +159,7 @@ public class TestNoOverCommitPolicy {
ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
res, minAlloc), false));
}
}
@ -175,7 +176,7 @@ public class TestNoOverCommitPolicy {
ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
res, minAlloc), false));
}
}
}

View File

@ -18,6 +18,11 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -38,12 +43,6 @@ import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.Assert;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public abstract class TestSchedulerPlanFollowerBase {
final static int GB = 1024;
protected Clock mClock = null;
@ -75,20 +74,20 @@ public abstract class TestSchedulerPlanFollowerBase {
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(r1, rDef, "u3",
"dedicated", 0, 0 + f1.length, ReservationSystemTestUtil
.generateAllocation(0L, 1L, f1), res, minAlloc)));
.generateAllocation(0L, 1L, f1), res, minAlloc), false));
ReservationId r2 = ReservationId.newInstance(ts, 2);
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(r2, rDef, "u3",
"dedicated", 3, 3 + f1.length, ReservationSystemTestUtil
.generateAllocation(3L, 1L, f1), res, minAlloc)));
.generateAllocation(3L, 1L, f1), res, minAlloc), false));
ReservationId r3 = ReservationId.newInstance(ts, 3);
int[] f2 = { 0, 10, 20, 10, 0 };
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(r3, rDef, "u4",
"dedicated", 10, 10 + f2.length, ReservationSystemTestUtil
.generateAllocation(10L, 1L, f2), res, minAlloc)));
.generateAllocation(10L, 1L, f2), res, minAlloc), false));
AbstractSchedulerPlanFollower planFollower = createPlanFollower();

View File

@ -711,9 +711,8 @@ public class TestAlignedPlanner {
Resource clusterCapacity = Resource.newInstance(capacityMem, capacityCores);
// Set configuration
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
String reservationQ = testUtil.getFullReservationQueueName();
String reservationQ =
ReservationSystemTestUtil.getFullReservationQueueName();
float instConstraint = 100;
float avgConstraint = 100;
@ -792,7 +791,7 @@ public class TestAlignedPlanner {
ReservationSystemTestUtil.getNewReservationId(), rDef,
"user_fixed", "dedicated", start, start + f.length * step,
ReservationSystemTestUtil.generateAllocation(start, step, f), res,
minAlloc)));
minAlloc), false));
}

View File

@ -21,7 +21,6 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@ -76,8 +75,8 @@ public class TestGreedyReservationAgent {
long timeWindow = 1000000L;
Resource clusterCapacity = Resource.newInstance(100 * 1024, 100);
step = 1000L;
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
String reservationQ = testUtil.getFullReservationQueueName();
String reservationQ =
ReservationSystemTestUtil.getFullReservationQueueName();
float instConstraint = 100;
float avgConstraint = 100;
@ -151,7 +150,7 @@ public class TestGreedyReservationAgent {
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
"dedicated", 30 * step, 30 * step + f.length * step,
ReservationSystemTestUtil.generateAllocation(30 * step, step, f),
res, minAlloc)));
res, minAlloc), false));
// create a chain of 4 RR, mixing gang and non-gang
ReservationDefinition rr = new ReservationDefinitionPBImpl();
@ -208,7 +207,7 @@ public class TestGreedyReservationAgent {
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
"dedicated", 30 * step, 30 * step + f.length * step,
ReservationSystemTestUtil.generateAllocation(30 * step, step, f),
res, minAlloc)));
res, minAlloc), false));
// create a chain of 4 RR, mixing gang and non-gang
ReservationDefinition rr = new ReservationDefinitionPBImpl();
@ -529,7 +528,7 @@ public class TestGreedyReservationAgent {
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
"dedicated", 0L, 0L + f.length * step, ReservationSystemTestUtil
.generateAllocation(0, step, f), res, minAlloc)));
.generateAllocation(0, step, f), res, minAlloc), false));
int[] f2 = { 5, 5, 5, 5, 5, 5, 5 };
Map<ReservationInterval, Resource> alloc =
@ -537,7 +536,8 @@ public class TestGreedyReservationAgent {
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
"dedicated", 5000, 5000 + f2.length * step, alloc, res, minAlloc)));
"dedicated", 5000, 5000 + f2.length * step, alloc, res, minAlloc),
false));
System.out.println("--------BEFORE AGENT----------");
System.out.println(plan.toString());
@ -563,7 +563,8 @@ public class TestGreedyReservationAgent {
step = 1000L;
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
CapacityScheduler scheduler = testUtil.mockCapacityScheduler(500 * 100);
String reservationQ = testUtil.getFullReservationQueueName();
String reservationQ =
ReservationSystemTestUtil.getFullReservationQueueName();
float instConstraint = 100;
float avgConstraint = 100;
ReservationSchedulerConfiguration conf =

View File

@ -93,44 +93,44 @@ public class TestSimpleCapacityReplanner {
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(r1, rDef, "u3",
"dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
minAlloc)));
minAlloc), false));
when(clock.getTime()).thenReturn(1L);
ReservationId r2 = ReservationId.newInstance(ts, 2);
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(r2, rDef, "u4",
"dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
minAlloc)));
minAlloc), false));
when(clock.getTime()).thenReturn(2L);
ReservationId r3 = ReservationId.newInstance(ts, 3);
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(r3, rDef, "u5",
"dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
minAlloc)));
minAlloc), false));
when(clock.getTime()).thenReturn(3L);
ReservationId r4 = ReservationId.newInstance(ts, 4);
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(r4, rDef, "u6",
"dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
minAlloc)));
minAlloc), false));
when(clock.getTime()).thenReturn(4L);
ReservationId r5 = ReservationId.newInstance(ts, 5);
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(r5, rDef, "u7",
"dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
minAlloc)));
minAlloc), false));
int[] f6 = { 50, 50, 50, 50, 50 };
ReservationId r6 = ReservationId.newInstance(ts, 6);
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(r6, rDef, "u3",
"dedicated", 10, 10 + f6.length, generateAllocation(10, f6), res,
minAlloc)));
minAlloc), false));
when(clock.getTime()).thenReturn(6L);
ReservationId r7 = ReservationId.newInstance(ts, 7);
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(r7, rDef, "u4",
"dedicated", 10, 10 + f6.length, generateAllocation(10, f6), res,
minAlloc)));
minAlloc), false));
// remove some of the resources (requires replanning)
plan.setTotalCapacity(Resource.newInstance(70 * 1024, 70));