YARN-1712. Plan follower that synchronizes the current state of reservation subsystem with the scheduler. Contributed by Subru Krishnan and Carlo Curino.

This commit is contained in:
subru 2014-09-16 16:45:45 -07:00
parent c4918cb4cb
commit 169085319b
4 changed files with 758 additions and 2 deletions

View File

@ -5,10 +5,10 @@ YARN-2475. Logic for responding to capacity drops for the
ReservationSystem. (Carlo Curino and Subru Krishnan via curino) ReservationSystem. (Carlo Curino and Subru Krishnan via curino)
YARN-1708. Public YARN APIs for creating/updating/deleting YARN-1708. Public YARN APIs for creating/updating/deleting
reservations. (Carlo Curino and Subru Krishnan via subru) reservations. (Subru Krishnan and Carlo Curino via subru)
YARN-1709. In-memory data structures used to track resources over YARN-1709. In-memory data structures used to track resources over
time to enable reservations. (Carlo Curino and Subru Krishnan via time to enable reservations. (Subru Krishnan and Carlo Curino via
subru) subru)
YARN-1710. Logic to find allocations within a Plan that satisfy YARN-1710. Logic to find allocations within a Plan that satisfy
@ -17,3 +17,6 @@ curino)
YARN-1711. Policy to enforce instantaneous and over-time quotas YARN-1711. Policy to enforce instantaneous and over-time quotas
on user reservations. (Carlo Curino and Subru Krishnan via curino) on user reservations. (Carlo Curino and Subru Krishnan via curino)
YARN-1712. Plan follower that synchronizes the current state of reservation
subsystem with the scheduler. (Subru Krishnan and Carlo Curino via subru)

View File

@ -0,0 +1,367 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ReservationQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class implements a {@link PlanFollower}. This is invoked on a timer, and
* it is in charge to publish the state of the {@link Plan}s to the underlying
* {@link CapacityScheduler}. This implementation does so, by
* adding/removing/resizing leaf queues in the scheduler, thus affecting the
* dynamic behavior of the scheduler in a way that is consistent with the
* content of the plan. It also updates the plan's view on how much resources
* are available in the cluster.
*
* This implementation of PlanFollower is relatively stateless, and it can
* synchronize schedulers and Plans that have arbitrary changes (performing set
* differences among existing queues). This makes it resilient to frequency of
* synchronization, and RM restart issues (no "catch up" is necessary).
*/
public class CapacitySchedulerPlanFollower implements PlanFollower {
private static final Logger LOG = LoggerFactory
.getLogger(CapacitySchedulerPlanFollower.class);
private Collection<Plan> plans = new ArrayList<Plan>();
private Clock clock;
private CapacityScheduler scheduler;
@Override
public void init(Clock clock, ResourceScheduler sched, Collection<Plan> plans) {
LOG.info("Initializing Plan Follower Policy:"
+ this.getClass().getCanonicalName());
if (!(sched instanceof CapacityScheduler)) {
throw new YarnRuntimeException(
"CapacitySchedulerPlanFollower can only work with CapacityScheduler");
}
this.clock = clock;
this.scheduler = (CapacityScheduler) sched;
this.plans.addAll(plans);
}
@Override
public synchronized void run() {
for (Plan plan : plans) {
synchronizePlan(plan);
}
}
@Override
public synchronized void synchronizePlan(Plan plan) {
String planQueueName = plan.getQueueName();
if (LOG.isDebugEnabled()) {
LOG.debug("Running plan follower edit policy for plan: " + planQueueName);
}
// align with plan step
long step = plan.getStep();
long now = clock.getTime();
if (now % step != 0) {
now += step - (now % step);
}
CSQueue queue = scheduler.getQueue(planQueueName);
if (!(queue instanceof PlanQueue)) {
LOG.error("The Plan is not an PlanQueue!");
return;
}
PlanQueue planQueue = (PlanQueue) queue;
// first we publish to the plan the current availability of resources
Resource clusterResources = scheduler.getClusterResource();
float planAbsCap = planQueue.getAbsoluteCapacity();
Resource planResources = Resources.multiply(clusterResources, planAbsCap);
plan.setTotalCapacity(planResources);
Set<ReservationAllocation> currentReservations =
plan.getReservationsAtTime(now);
Set<String> curReservationNames = new HashSet<String>();
Resource reservedResources = Resource.newInstance(0, 0);
int numRes = 0;
if (currentReservations != null) {
numRes = currentReservations.size();
for (ReservationAllocation reservation : currentReservations) {
curReservationNames.add(reservation.getReservationId().toString());
Resources.addTo(reservedResources, reservation.getResourcesAtTime(now));
}
}
// create the default reservation queue if it doesnt exist
String defReservationQueue = planQueueName + PlanQueue.DEFAULT_QUEUE_SUFFIX;
if (scheduler.getQueue(defReservationQueue) == null) {
ReservationQueue defQueue =
new ReservationQueue(scheduler, defReservationQueue, planQueue);
try {
scheduler.addQueue(defQueue);
} catch (SchedulerDynamicEditException e) {
LOG.warn(
"Exception while trying to create default reservation queue for plan: {}",
planQueueName, e);
}
}
curReservationNames.add(defReservationQueue);
// if the resources dedicated to this plan has shrunk invoke replanner
if (Resources.greaterThan(scheduler.getResourceCalculator(),
clusterResources, reservedResources, planResources)) {
try {
plan.getReplanner().plan(plan, null);
} catch (PlanningException e) {
LOG.warn("Exception while trying to replan: {}", planQueueName, e);
}
}
// identify the reservations that have expired and new reservations that
// have to be activated
List<CSQueue> resQueues = planQueue.getChildQueues();
Set<String> expired = new HashSet<String>();
for (CSQueue resQueue : resQueues) {
String resQueueName = resQueue.getQueueName();
if (curReservationNames.contains(resQueueName)) {
// it is already existing reservation, so needed not create new
// reservation queue
curReservationNames.remove(resQueueName);
} else {
// the reservation has termination, mark for cleanup
expired.add(resQueueName);
}
}
// garbage collect expired reservations
cleanupExpiredQueues(plan.getMoveOnExpiry(), expired, defReservationQueue);
// Add new reservations and update existing ones
float totalAssignedCapacity = 0f;
if (currentReservations != null) {
// first release all excess capacity in default queue
try {
scheduler.setEntitlement(defReservationQueue, new QueueEntitlement(0f,
1.0f));
} catch (YarnException e) {
LOG.warn(
"Exception while trying to release default queue capacity for plan: {}",
planQueueName, e);
}
// sort allocations from the one giving up the most resources, to the
// one asking for the most
// avoid order-of-operation errors that temporarily violate 100%
// capacity bound
List<ReservationAllocation> sortedAllocations =
sortByDelta(
new ArrayList<ReservationAllocation>(currentReservations), now);
for (ReservationAllocation res : sortedAllocations) {
String currResId = res.getReservationId().toString();
if (curReservationNames.contains(currResId)) {
ReservationQueue resQueue =
new ReservationQueue(scheduler, currResId, planQueue);
try {
scheduler.addQueue(resQueue);
} catch (SchedulerDynamicEditException e) {
LOG.warn(
"Exception while trying to activate reservation: {} for plan: {}",
currResId, planQueueName, e);
}
}
Resource capToAssign = res.getResourcesAtTime(now);
float targetCapacity = 0f;
if (planResources.getMemory() > 0
&& planResources.getVirtualCores() > 0) {
targetCapacity =
Resources.divide(scheduler.getResourceCalculator(),
clusterResources, capToAssign, planResources);
}
if (LOG.isDebugEnabled()) {
LOG.debug(
"Assigning capacity of {} to queue {} with target capacity {}",
capToAssign, currResId, targetCapacity);
}
// set maxCapacity to 100% unless the job requires gang, in which
// case we stick to capacity (as running early/before is likely a
// waste of resources)
float maxCapacity = 1.0f;
if (res.containsGangs()) {
maxCapacity = targetCapacity;
}
try {
scheduler.setEntitlement(currResId, new QueueEntitlement(
targetCapacity, maxCapacity));
} catch (YarnException e) {
LOG.warn("Exception while trying to size reservation for plan: {}",
currResId, planQueueName, e);
}
totalAssignedCapacity += targetCapacity;
}
}
// compute the default queue capacity
float defQCap = 1.0f - totalAssignedCapacity;
if (LOG.isDebugEnabled()) {
LOG.debug("PlanFollowerEditPolicyTask: total Plan Capacity: {} "
+ "currReservation: {} default-queue capacity: {}", planResources,
numRes, defQCap);
}
// set the default queue to eat-up all remaining capacity
try {
scheduler.setEntitlement(defReservationQueue, new QueueEntitlement(
defQCap, 1.0f));
} catch (YarnException e) {
LOG.warn(
"Exception while trying to reclaim default queue capacity for plan: {}",
planQueueName, e);
}
// garbage collect finished reservations from plan
try {
plan.archiveCompletedReservations(now);
} catch (PlanningException e) {
LOG.error("Exception in archiving completed reservations: ", e);
}
LOG.info("Finished iteration of plan follower edit policy for plan: "
+ planQueueName);
// Extension: update plan with app states,
// useful to support smart replanning
}
/**
* Move all apps in the set of queues to the parent plan queue's default
* reservation queue in a synchronous fashion
*/
private void moveAppsInQueueSync(String expiredReservation,
String defReservationQueue) {
List<ApplicationAttemptId> activeApps =
scheduler.getAppsInQueue(expiredReservation);
if (activeApps.isEmpty()) {
return;
}
for (ApplicationAttemptId app : activeApps) {
// fallback to parent's default queue
try {
scheduler.moveApplication(app.getApplicationId(), defReservationQueue);
} catch (YarnException e) {
LOG.warn(
"Encountered unexpected error during migration of application: {} from reservation: {}",
app, expiredReservation, e);
}
}
}
/**
* First sets entitlement of queues to zero to prevent new app submission.
* Then move all apps in the set of queues to the parent plan queue's default
* reservation queue if move is enabled. Finally cleanups the queue by killing
* any apps (if move is disabled or move failed) and removing the queue
*/
private void cleanupExpiredQueues(boolean shouldMove, Set<String> toRemove,
String defReservationQueue) {
for (String expiredReservation : toRemove) {
try {
// reduce entitlement to 0
scheduler.setEntitlement(expiredReservation, new QueueEntitlement(0.0f,
0.0f));
if (shouldMove) {
moveAppsInQueueSync(expiredReservation, defReservationQueue);
}
if (scheduler.getAppsInQueue(expiredReservation).size() > 0) {
scheduler.killAllAppsInQueue(expiredReservation);
LOG.info("Killing applications in queue: {}", expiredReservation);
} else {
scheduler.removeQueue(expiredReservation);
LOG.info("Queue: " + expiredReservation + " removed");
}
} catch (YarnException e) {
LOG.warn("Exception while trying to expire reservation: {}",
expiredReservation, e);
}
}
}
@Override
public synchronized void setPlans(Collection<Plan> plans) {
this.plans.clear();
this.plans.addAll(plans);
}
/**
* Sort in the order from the least new amount of resources asked (likely
* negative) to the highest. This prevents "order-of-operation" errors related
* to exceeding 100% capacity temporarily.
*/
private List<ReservationAllocation> sortByDelta(
List<ReservationAllocation> currentReservations, long now) {
Collections.sort(currentReservations, new ReservationAllocationComparator(
scheduler, now));
return currentReservations;
}
private class ReservationAllocationComparator implements
Comparator<ReservationAllocation> {
CapacityScheduler scheduler;
long now;
ReservationAllocationComparator(CapacityScheduler scheduler, long now) {
this.scheduler = scheduler;
this.now = now;
}
private Resource getUnallocatedReservedResources(
ReservationAllocation reservation) {
Resource resResource;
CSQueue resQueue =
scheduler.getQueue(reservation.getReservationId().toString());
if (resQueue != null) {
resResource =
Resources.subtract(
reservation.getResourcesAtTime(now),
Resources.multiply(scheduler.getClusterResource(),
resQueue.getAbsoluteCapacity()));
} else {
resResource = reservation.getResourcesAtTime(now);
}
return resResource;
}
@Override
public int compare(ReservationAllocation lhs, ReservationAllocation rhs) {
// compute delta between current and previous reservation, and compare
// based on that
Resource lhsRes = getUnallocatedReservedResources(lhs);
Resource rhsRes = getUnallocatedReservedResources(rhs);
return lhsRes.compareTo(rhsRes);
}
}
}

View File

@ -0,0 +1,67 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.Collection;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.util.Clock;
/**
* A PlanFollower is a component that runs on a timer, and synchronizes the
* underlying {@link ResourceScheduler} with the {@link Plan}(s) and viceversa.
*
* While different implementations might operate differently, the key idea is to
* map the current allocation of resources for each active reservation in the
* plan(s), to a corresponding notion in the underlying scheduler (e.g., tuning
* capacity of queues, set pool weights, or tweak application priorities). The
* goal is to affect the dynamic allocation of resources done by the scheduler
* so that the jobs obtain access to resources in a way that is consistent with
* the reservations in the plan. A key conceptual step here is to convert the
* absolute-valued promises made in the reservations to appropriate relative
* priorities/queue sizes etc.
*
* Symmetrically the PlanFollower exposes changes in cluster conditions (as
* tracked by the scheduler) to the plan, e.g., the overall amount of physical
* resources available. The Plan in turn can react by replanning its allocations
* if appropriate.
*
* The implementation can assume that is run frequently enough to be able to
* observe and react to normal operational changes in cluster conditions on the
* fly (e.g., if cluster resources drop, we can update the relative weights of a
* queue so that the absolute promises made to the job at reservation time are
* respected).
*
* However, due to RM restarts and the related downtime, it is advisable for
* implementations to operate in a stateless way, and be able to synchronize the
* state of plans/scheduler regardless of how big is the time gap between
* executions.
*/
public interface PlanFollower extends Runnable {
/**
* Init function that configures the PlanFollower, by providing:
*
* @param clock a reference to the system clock.
* @param sched a reference to the underlying scheduler
* @param plans references to the plans we should keep synchronized at every
* time tick.
*/
public void init(Clock clock, ResourceScheduler sched, Collection<Plan> plans);
/**
* The function performing the actual synchronization operation for a given
* Plan. This is normally invoked by the run method, but it can be invoked
* synchronously to avoid race conditions when a user's reservation request
* start time is imminent.
*
* @param plan the Plan to synchronize
*/
public void synchronizePlan(Plan plan);
/**
* Setter for the list of plans.
*
* @param plans the collection of Plans we operate on at every time tick.
*/
public void setPlans(Collection<Plan> plans);
}

View File

@ -0,0 +1,319 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.mockito.Matchers;
import org.mockito.Mockito;
public class TestCapacitySchedulerPlanFollower {
final static int GB = 1024;
private Clock mClock = null;
private CapacityScheduler scheduler = null;
private RMContext rmContext;
private RMContext spyRMContext;
private CapacitySchedulerContext csContext;
private ReservationAgent mAgent;
private Plan plan;
private Resource minAlloc = Resource.newInstance(GB, 1);
private Resource maxAlloc = Resource.newInstance(GB * 8, 8);
private ResourceCalculator res = new DefaultResourceCalculator();
private CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
@Rule
public TestName name = new TestName();
@Before
public void setUp() throws Exception {
CapacityScheduler spyCs = new CapacityScheduler();
scheduler = spy(spyCs);
rmContext = TestUtils.getMockRMContext();
spyRMContext = spy(rmContext);
ConcurrentMap<ApplicationId, RMApp> spyApps =
spy(new ConcurrentHashMap<ApplicationId, RMApp>());
RMApp rmApp = mock(RMApp.class);
when(rmApp.getRMAppAttempt((ApplicationAttemptId) Matchers.any()))
.thenReturn(null);
Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId) Matchers.any());
when(spyRMContext.getRMApps()).thenReturn(spyApps);
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
ReservationSystemTestUtil.setupQueueConfiguration(csConf);
scheduler.setConf(csConf);
csContext = mock(CapacitySchedulerContext.class);
when(csContext.getConfiguration()).thenReturn(csConf);
when(csContext.getConf()).thenReturn(csConf);
when(csContext.getMinimumResourceCapability()).thenReturn(minAlloc);
when(csContext.getMaximumResourceCapability()).thenReturn(maxAlloc);
when(csContext.getClusterResource()).thenReturn(
Resources.createResource(100 * 16 * GB, 100 * 32));
when(scheduler.getClusterResource()).thenReturn(
Resources.createResource(125 * GB, 125));
when(csContext.getResourceCalculator()).thenReturn(
new DefaultResourceCalculator());
RMContainerTokenSecretManager containerTokenSecretManager =
new RMContainerTokenSecretManager(csConf);
containerTokenSecretManager.rollMasterKey();
when(csContext.getContainerTokenSecretManager()).thenReturn(
containerTokenSecretManager);
scheduler.setRMContext(spyRMContext);
scheduler.init(csConf);
scheduler.start();
setupPlanFollower();
}
private void setupPlanFollower() throws Exception {
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
mClock = mock(Clock.class);
mAgent = mock(ReservationAgent.class);
String reservationQ = testUtil.getFullReservationQueueName();
CapacitySchedulerConfiguration csConf = scheduler.getConfiguration();
csConf.setReservationWindow(reservationQ, 20L);
csConf.setMaximumCapacity(reservationQ, 40);
csConf.setAverageCapacity(reservationQ, 20);
policy.init(reservationQ, csConf);
}
@Test
public void testWithMoveOnExpiry() throws PlanningException,
InterruptedException, AccessControlException {
// invoke plan follower test with move
testPlanFollower(true);
}
@Test
public void testWithKillOnExpiry() throws PlanningException,
InterruptedException, AccessControlException {
// invoke plan follower test with kill
testPlanFollower(false);
}
private void testPlanFollower(boolean isMove) throws PlanningException,
InterruptedException, AccessControlException {
// Initialize plan based on move flag
plan =
new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent,
scheduler.getClusterResource(), 1L, res,
scheduler.getMinimumResourceCapability(), maxAlloc, "dedicated",
null, isMove);
// add a few reservations to the plan
long ts = System.currentTimeMillis();
ReservationId r1 = ReservationId.newInstance(ts, 1);
int[] f1 = { 10, 10, 10, 10, 10 };
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(r1, null, "u3",
"dedicated", 0, 0 + f1.length, ReservationSystemTestUtil
.generateAllocation(0L, 1L, f1), res, minAlloc)));
ReservationId r2 = ReservationId.newInstance(ts, 2);
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(r2, null, "u3",
"dedicated", 3, 3 + f1.length, ReservationSystemTestUtil
.generateAllocation(3L, 1L, f1), res, minAlloc)));
ReservationId r3 = ReservationId.newInstance(ts, 3);
int[] f2 = { 0, 10, 20, 10, 0 };
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(r3, null, "u4",
"dedicated", 10, 10 + f2.length, ReservationSystemTestUtil
.generateAllocation(10L, 1L, f2), res, minAlloc)));
CapacitySchedulerPlanFollower planFollower =
new CapacitySchedulerPlanFollower();
planFollower.init(mClock, scheduler, Collections.singletonList(plan));
when(mClock.getTime()).thenReturn(0L);
planFollower.run();
CSQueue defQ =
scheduler.getQueue("dedicated" + PlanQueue.DEFAULT_QUEUE_SUFFIX);
CSQueue q = scheduler.getQueue(r1.toString());
assertNotNull(q);
// submit an app to r1
String user_0 = "test-user";
ApplicationId appId = ApplicationId.newInstance(0, 1);
ApplicationAttemptId appAttemptId_0 =
ApplicationAttemptId.newInstance(appId, 0);
AppAddedSchedulerEvent addAppEvent =
new AppAddedSchedulerEvent(appId, q.getQueueName(), user_0);
scheduler.handle(addAppEvent);
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId_0, false);
scheduler.handle(appAttemptAddedEvent);
// initial default reservation queue should have no apps
Assert.assertEquals(0, defQ.getNumApplications());
Assert.assertEquals(0.1, q.getCapacity(), 0.01);
Assert.assertEquals(0.1, q.getMaximumCapacity(), 1.0);
Assert.assertEquals(1, q.getNumApplications());
CSQueue q2 = scheduler.getQueue(r2.toString());
assertNull(q2);
CSQueue q3 = scheduler.getQueue(r3.toString());
assertNull(q3);
when(mClock.getTime()).thenReturn(3L);
planFollower.run();
Assert.assertEquals(0, defQ.getNumApplications());
q = scheduler.getQueue(r1.toString());
assertNotNull(q);
Assert.assertEquals(0.1, q.getCapacity(), 0.01);
Assert.assertEquals(0.1, q.getMaximumCapacity(), 1.0);
Assert.assertEquals(1, q.getNumApplications());
q2 = scheduler.getQueue(r2.toString());
assertNotNull(q2);
Assert.assertEquals(0.1, q.getCapacity(), 0.01);
Assert.assertEquals(0.1, q.getMaximumCapacity(), 1.0);
q3 = scheduler.getQueue(r3.toString());
assertNull(q3);
when(mClock.getTime()).thenReturn(10L);
planFollower.run();
q = scheduler.getQueue(r1.toString());
if (isMove) {
// app should have been moved to default reservation queue
Assert.assertEquals(1, defQ.getNumApplications());
assertNull(q);
} else {
// app should be killed
Assert.assertEquals(0, defQ.getNumApplications());
assertNotNull(q);
AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
new AppAttemptRemovedSchedulerEvent(appAttemptId_0,
RMAppAttemptState.KILLED, false);
scheduler.handle(appAttemptRemovedEvent);
}
q2 = scheduler.getQueue(r2.toString());
assertNull(q2);
q3 = scheduler.getQueue(r3.toString());
assertNotNull(q3);
Assert.assertEquals(0, q3.getCapacity(), 0.01);
Assert.assertEquals(1.0, q3.getMaximumCapacity(), 1.0);
when(mClock.getTime()).thenReturn(11L);
planFollower.run();
if (isMove) {
// app should have been moved to default reservation queue
Assert.assertEquals(1, defQ.getNumApplications());
} else {
// app should be killed
Assert.assertEquals(0, defQ.getNumApplications());
}
q = scheduler.getQueue(r1.toString());
assertNull(q);
q2 = scheduler.getQueue(r2.toString());
assertNull(q2);
q3 = scheduler.getQueue(r3.toString());
assertNotNull(q3);
Assert.assertEquals(0.1, q3.getCapacity(), 0.01);
Assert.assertEquals(0.1, q3.getMaximumCapacity(), 1.0);
when(mClock.getTime()).thenReturn(12L);
planFollower.run();
q = scheduler.getQueue(r1.toString());
assertNull(q);
q2 = scheduler.getQueue(r2.toString());
assertNull(q2);
q3 = scheduler.getQueue(r3.toString());
assertNotNull(q3);
Assert.assertEquals(0.2, q3.getCapacity(), 0.01);
Assert.assertEquals(0.2, q3.getMaximumCapacity(), 1.0);
when(mClock.getTime()).thenReturn(16L);
planFollower.run();
q = scheduler.getQueue(r1.toString());
assertNull(q);
q2 = scheduler.getQueue(r2.toString());
assertNull(q2);
q3 = scheduler.getQueue(r3.toString());
assertNull(q3);
assertTrue(defQ.getCapacity() > 0.9);
}
public static ApplicationACLsManager mockAppACLsManager() {
Configuration conf = new Configuration();
return new ApplicationACLsManager(conf);
}
@After
public void tearDown() throws Exception {
if (scheduler != null) {
scheduler.stop();
}
}
}