diff --git a/YARN-1051-CHANGES.txt b/YARN-1051-CHANGES.txt index e9ec691a6e0..56b3c12d8fb 100644 --- a/YARN-1051-CHANGES.txt +++ b/YARN-1051-CHANGES.txt @@ -5,10 +5,10 @@ YARN-2475. Logic for responding to capacity drops for the ReservationSystem. (Carlo Curino and Subru Krishnan via curino) YARN-1708. Public YARN APIs for creating/updating/deleting -reservations. (Carlo Curino and Subru Krishnan via subru) +reservations. (Subru Krishnan and Carlo Curino via subru) YARN-1709. In-memory data structures used to track resources over -time to enable reservations. (Carlo Curino and Subru Krishnan via +time to enable reservations. (Subru Krishnan and Carlo Curino via subru) YARN-1710. Logic to find allocations within a Plan that satisfy @@ -17,3 +17,6 @@ curino) YARN-1711. Policy to enforce instantaneous and over-time quotas on user reservations. (Carlo Curino and Subru Krishnan via curino) + +YARN-1712. Plan follower that synchronizes the current state of reservation +subsystem with the scheduler. (Subru Krishnan and Carlo Curino via subru) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java new file mode 100644 index 00000000000..cfa172c4c6d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java @@ -0,0 +1,367 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ReservationQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class implements a {@link PlanFollower}. This is invoked on a timer, and + * it is in charge to publish the state of the {@link Plan}s to the underlying + * {@link CapacityScheduler}. This implementation does so, by + * adding/removing/resizing leaf queues in the scheduler, thus affecting the + * dynamic behavior of the scheduler in a way that is consistent with the + * content of the plan. It also updates the plan's view on how much resources + * are available in the cluster. + * + * This implementation of PlanFollower is relatively stateless, and it can + * synchronize schedulers and Plans that have arbitrary changes (performing set + * differences among existing queues). This makes it resilient to frequency of + * synchronization, and RM restart issues (no "catch up" is necessary). + */ +public class CapacitySchedulerPlanFollower implements PlanFollower { + + private static final Logger LOG = LoggerFactory + .getLogger(CapacitySchedulerPlanFollower.class); + + private Collection plans = new ArrayList(); + + private Clock clock; + private CapacityScheduler scheduler; + + @Override + public void init(Clock clock, ResourceScheduler sched, Collection plans) { + LOG.info("Initializing Plan Follower Policy:" + + this.getClass().getCanonicalName()); + if (!(sched instanceof CapacityScheduler)) { + throw new YarnRuntimeException( + "CapacitySchedulerPlanFollower can only work with CapacityScheduler"); + } + this.clock = clock; + this.scheduler = (CapacityScheduler) sched; + this.plans.addAll(plans); + } + + @Override + public synchronized void run() { + for (Plan plan : plans) { + synchronizePlan(plan); + } + } + + @Override + public synchronized void synchronizePlan(Plan plan) { + String planQueueName = plan.getQueueName(); + if (LOG.isDebugEnabled()) { + LOG.debug("Running plan follower edit policy for plan: " + planQueueName); + } + // align with plan step + long step = plan.getStep(); + long now = clock.getTime(); + if (now % step != 0) { + now += step - (now % step); + } + CSQueue queue = scheduler.getQueue(planQueueName); + if (!(queue instanceof PlanQueue)) { + LOG.error("The Plan is not an PlanQueue!"); + return; + } + PlanQueue planQueue = (PlanQueue) queue; + // first we publish to the plan the current availability of resources + Resource clusterResources = scheduler.getClusterResource(); + float planAbsCap = planQueue.getAbsoluteCapacity(); + Resource planResources = Resources.multiply(clusterResources, planAbsCap); + plan.setTotalCapacity(planResources); + + Set currentReservations = + plan.getReservationsAtTime(now); + Set curReservationNames = new HashSet(); + Resource reservedResources = Resource.newInstance(0, 0); + int numRes = 0; + if (currentReservations != null) { + numRes = currentReservations.size(); + for (ReservationAllocation reservation : currentReservations) { + curReservationNames.add(reservation.getReservationId().toString()); + Resources.addTo(reservedResources, reservation.getResourcesAtTime(now)); + } + } + // create the default reservation queue if it doesnt exist + String defReservationQueue = planQueueName + PlanQueue.DEFAULT_QUEUE_SUFFIX; + if (scheduler.getQueue(defReservationQueue) == null) { + ReservationQueue defQueue = + new ReservationQueue(scheduler, defReservationQueue, planQueue); + try { + scheduler.addQueue(defQueue); + } catch (SchedulerDynamicEditException e) { + LOG.warn( + "Exception while trying to create default reservation queue for plan: {}", + planQueueName, e); + } + } + curReservationNames.add(defReservationQueue); + // if the resources dedicated to this plan has shrunk invoke replanner + if (Resources.greaterThan(scheduler.getResourceCalculator(), + clusterResources, reservedResources, planResources)) { + try { + plan.getReplanner().plan(plan, null); + } catch (PlanningException e) { + LOG.warn("Exception while trying to replan: {}", planQueueName, e); + } + } + // identify the reservations that have expired and new reservations that + // have to be activated + List resQueues = planQueue.getChildQueues(); + Set expired = new HashSet(); + for (CSQueue resQueue : resQueues) { + String resQueueName = resQueue.getQueueName(); + if (curReservationNames.contains(resQueueName)) { + // it is already existing reservation, so needed not create new + // reservation queue + curReservationNames.remove(resQueueName); + } else { + // the reservation has termination, mark for cleanup + expired.add(resQueueName); + } + } + // garbage collect expired reservations + cleanupExpiredQueues(plan.getMoveOnExpiry(), expired, defReservationQueue); + + // Add new reservations and update existing ones + float totalAssignedCapacity = 0f; + if (currentReservations != null) { + // first release all excess capacity in default queue + try { + scheduler.setEntitlement(defReservationQueue, new QueueEntitlement(0f, + 1.0f)); + } catch (YarnException e) { + LOG.warn( + "Exception while trying to release default queue capacity for plan: {}", + planQueueName, e); + } + // sort allocations from the one giving up the most resources, to the + // one asking for the most + // avoid order-of-operation errors that temporarily violate 100% + // capacity bound + List sortedAllocations = + sortByDelta( + new ArrayList(currentReservations), now); + for (ReservationAllocation res : sortedAllocations) { + String currResId = res.getReservationId().toString(); + if (curReservationNames.contains(currResId)) { + ReservationQueue resQueue = + new ReservationQueue(scheduler, currResId, planQueue); + try { + scheduler.addQueue(resQueue); + } catch (SchedulerDynamicEditException e) { + LOG.warn( + "Exception while trying to activate reservation: {} for plan: {}", + currResId, planQueueName, e); + } + } + Resource capToAssign = res.getResourcesAtTime(now); + float targetCapacity = 0f; + if (planResources.getMemory() > 0 + && planResources.getVirtualCores() > 0) { + targetCapacity = + Resources.divide(scheduler.getResourceCalculator(), + clusterResources, capToAssign, planResources); + } + if (LOG.isDebugEnabled()) { + LOG.debug( + "Assigning capacity of {} to queue {} with target capacity {}", + capToAssign, currResId, targetCapacity); + } + // set maxCapacity to 100% unless the job requires gang, in which + // case we stick to capacity (as running early/before is likely a + // waste of resources) + float maxCapacity = 1.0f; + if (res.containsGangs()) { + maxCapacity = targetCapacity; + } + try { + scheduler.setEntitlement(currResId, new QueueEntitlement( + targetCapacity, maxCapacity)); + } catch (YarnException e) { + LOG.warn("Exception while trying to size reservation for plan: {}", + currResId, planQueueName, e); + } + totalAssignedCapacity += targetCapacity; + } + } + // compute the default queue capacity + float defQCap = 1.0f - totalAssignedCapacity; + if (LOG.isDebugEnabled()) { + LOG.debug("PlanFollowerEditPolicyTask: total Plan Capacity: {} " + + "currReservation: {} default-queue capacity: {}", planResources, + numRes, defQCap); + } + // set the default queue to eat-up all remaining capacity + try { + scheduler.setEntitlement(defReservationQueue, new QueueEntitlement( + defQCap, 1.0f)); + } catch (YarnException e) { + LOG.warn( + "Exception while trying to reclaim default queue capacity for plan: {}", + planQueueName, e); + } + // garbage collect finished reservations from plan + try { + plan.archiveCompletedReservations(now); + } catch (PlanningException e) { + LOG.error("Exception in archiving completed reservations: ", e); + } + LOG.info("Finished iteration of plan follower edit policy for plan: " + + planQueueName); + + // Extension: update plan with app states, + // useful to support smart replanning + } + + /** + * Move all apps in the set of queues to the parent plan queue's default + * reservation queue in a synchronous fashion + */ + private void moveAppsInQueueSync(String expiredReservation, + String defReservationQueue) { + List activeApps = + scheduler.getAppsInQueue(expiredReservation); + if (activeApps.isEmpty()) { + return; + } + for (ApplicationAttemptId app : activeApps) { + // fallback to parent's default queue + try { + scheduler.moveApplication(app.getApplicationId(), defReservationQueue); + } catch (YarnException e) { + LOG.warn( + "Encountered unexpected error during migration of application: {} from reservation: {}", + app, expiredReservation, e); + } + } + } + + /** + * First sets entitlement of queues to zero to prevent new app submission. + * Then move all apps in the set of queues to the parent plan queue's default + * reservation queue if move is enabled. Finally cleanups the queue by killing + * any apps (if move is disabled or move failed) and removing the queue + */ + private void cleanupExpiredQueues(boolean shouldMove, Set toRemove, + String defReservationQueue) { + for (String expiredReservation : toRemove) { + try { + // reduce entitlement to 0 + scheduler.setEntitlement(expiredReservation, new QueueEntitlement(0.0f, + 0.0f)); + if (shouldMove) { + moveAppsInQueueSync(expiredReservation, defReservationQueue); + } + if (scheduler.getAppsInQueue(expiredReservation).size() > 0) { + scheduler.killAllAppsInQueue(expiredReservation); + LOG.info("Killing applications in queue: {}", expiredReservation); + } else { + scheduler.removeQueue(expiredReservation); + LOG.info("Queue: " + expiredReservation + " removed"); + } + } catch (YarnException e) { + LOG.warn("Exception while trying to expire reservation: {}", + expiredReservation, e); + } + } + } + + @Override + public synchronized void setPlans(Collection plans) { + this.plans.clear(); + this.plans.addAll(plans); + } + + /** + * Sort in the order from the least new amount of resources asked (likely + * negative) to the highest. This prevents "order-of-operation" errors related + * to exceeding 100% capacity temporarily. + */ + private List sortByDelta( + List currentReservations, long now) { + Collections.sort(currentReservations, new ReservationAllocationComparator( + scheduler, now)); + return currentReservations; + } + + private class ReservationAllocationComparator implements + Comparator { + CapacityScheduler scheduler; + long now; + + ReservationAllocationComparator(CapacityScheduler scheduler, long now) { + this.scheduler = scheduler; + this.now = now; + } + + private Resource getUnallocatedReservedResources( + ReservationAllocation reservation) { + Resource resResource; + CSQueue resQueue = + scheduler.getQueue(reservation.getReservationId().toString()); + if (resQueue != null) { + resResource = + Resources.subtract( + reservation.getResourcesAtTime(now), + Resources.multiply(scheduler.getClusterResource(), + resQueue.getAbsoluteCapacity())); + } else { + resResource = reservation.getResourcesAtTime(now); + } + return resResource; + } + + @Override + public int compare(ReservationAllocation lhs, ReservationAllocation rhs) { + // compute delta between current and previous reservation, and compare + // based on that + Resource lhsRes = getUnallocatedReservedResources(lhs); + Resource rhsRes = getUnallocatedReservedResources(rhs); + return lhsRes.compareTo(rhsRes); + } + + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanFollower.java new file mode 100644 index 00000000000..9d003666b01 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanFollower.java @@ -0,0 +1,67 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import java.util.Collection; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.util.Clock; + +/** + * A PlanFollower is a component that runs on a timer, and synchronizes the + * underlying {@link ResourceScheduler} with the {@link Plan}(s) and viceversa. + * + * While different implementations might operate differently, the key idea is to + * map the current allocation of resources for each active reservation in the + * plan(s), to a corresponding notion in the underlying scheduler (e.g., tuning + * capacity of queues, set pool weights, or tweak application priorities). The + * goal is to affect the dynamic allocation of resources done by the scheduler + * so that the jobs obtain access to resources in a way that is consistent with + * the reservations in the plan. A key conceptual step here is to convert the + * absolute-valued promises made in the reservations to appropriate relative + * priorities/queue sizes etc. + * + * Symmetrically the PlanFollower exposes changes in cluster conditions (as + * tracked by the scheduler) to the plan, e.g., the overall amount of physical + * resources available. The Plan in turn can react by replanning its allocations + * if appropriate. + * + * The implementation can assume that is run frequently enough to be able to + * observe and react to normal operational changes in cluster conditions on the + * fly (e.g., if cluster resources drop, we can update the relative weights of a + * queue so that the absolute promises made to the job at reservation time are + * respected). + * + * However, due to RM restarts and the related downtime, it is advisable for + * implementations to operate in a stateless way, and be able to synchronize the + * state of plans/scheduler regardless of how big is the time gap between + * executions. + */ +public interface PlanFollower extends Runnable { + + /** + * Init function that configures the PlanFollower, by providing: + * + * @param clock a reference to the system clock. + * @param sched a reference to the underlying scheduler + * @param plans references to the plans we should keep synchronized at every + * time tick. + */ + public void init(Clock clock, ResourceScheduler sched, Collection plans); + + /** + * The function performing the actual synchronization operation for a given + * Plan. This is normally invoked by the run method, but it can be invoked + * synchronously to avoid race conditions when a user's reservation request + * start time is imminent. + * + * @param plan the Plan to synchronize + */ + public void synchronizePlan(Plan plan); + + /** + * Setter for the list of plans. + * + * @param plans the collection of Plans we operate on at every time tick. + */ + public void setPlans(Collection plans); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java new file mode 100644 index 00000000000..4eedd42cc66 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java @@ -0,0 +1,319 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.mockito.Matchers; +import org.mockito.Mockito; + +public class TestCapacitySchedulerPlanFollower { + + final static int GB = 1024; + + private Clock mClock = null; + private CapacityScheduler scheduler = null; + private RMContext rmContext; + private RMContext spyRMContext; + private CapacitySchedulerContext csContext; + private ReservationAgent mAgent; + private Plan plan; + private Resource minAlloc = Resource.newInstance(GB, 1); + private Resource maxAlloc = Resource.newInstance(GB * 8, 8); + private ResourceCalculator res = new DefaultResourceCalculator(); + private CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); + + @Rule + public TestName name = new TestName(); + + @Before + public void setUp() throws Exception { + CapacityScheduler spyCs = new CapacityScheduler(); + scheduler = spy(spyCs); + rmContext = TestUtils.getMockRMContext(); + spyRMContext = spy(rmContext); + + ConcurrentMap spyApps = + spy(new ConcurrentHashMap()); + RMApp rmApp = mock(RMApp.class); + when(rmApp.getRMAppAttempt((ApplicationAttemptId) Matchers.any())) + .thenReturn(null); + Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId) Matchers.any()); + when(spyRMContext.getRMApps()).thenReturn(spyApps); + + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + ReservationSystemTestUtil.setupQueueConfiguration(csConf); + + scheduler.setConf(csConf); + + csContext = mock(CapacitySchedulerContext.class); + when(csContext.getConfiguration()).thenReturn(csConf); + when(csContext.getConf()).thenReturn(csConf); + when(csContext.getMinimumResourceCapability()).thenReturn(minAlloc); + when(csContext.getMaximumResourceCapability()).thenReturn(maxAlloc); + when(csContext.getClusterResource()).thenReturn( + Resources.createResource(100 * 16 * GB, 100 * 32)); + when(scheduler.getClusterResource()).thenReturn( + Resources.createResource(125 * GB, 125)); + when(csContext.getResourceCalculator()).thenReturn( + new DefaultResourceCalculator()); + RMContainerTokenSecretManager containerTokenSecretManager = + new RMContainerTokenSecretManager(csConf); + containerTokenSecretManager.rollMasterKey(); + when(csContext.getContainerTokenSecretManager()).thenReturn( + containerTokenSecretManager); + + scheduler.setRMContext(spyRMContext); + scheduler.init(csConf); + scheduler.start(); + + setupPlanFollower(); + } + + private void setupPlanFollower() throws Exception { + ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); + mClock = mock(Clock.class); + mAgent = mock(ReservationAgent.class); + + String reservationQ = testUtil.getFullReservationQueueName(); + CapacitySchedulerConfiguration csConf = scheduler.getConfiguration(); + csConf.setReservationWindow(reservationQ, 20L); + csConf.setMaximumCapacity(reservationQ, 40); + csConf.setAverageCapacity(reservationQ, 20); + policy.init(reservationQ, csConf); + } + + @Test + public void testWithMoveOnExpiry() throws PlanningException, + InterruptedException, AccessControlException { + // invoke plan follower test with move + testPlanFollower(true); + } + + @Test + public void testWithKillOnExpiry() throws PlanningException, + InterruptedException, AccessControlException { + // invoke plan follower test with kill + testPlanFollower(false); + } + + private void testPlanFollower(boolean isMove) throws PlanningException, + InterruptedException, AccessControlException { + // Initialize plan based on move flag + plan = + new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent, + scheduler.getClusterResource(), 1L, res, + scheduler.getMinimumResourceCapability(), maxAlloc, "dedicated", + null, isMove); + + // add a few reservations to the plan + long ts = System.currentTimeMillis(); + ReservationId r1 = ReservationId.newInstance(ts, 1); + int[] f1 = { 10, 10, 10, 10, 10 }; + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation(r1, null, "u3", + "dedicated", 0, 0 + f1.length, ReservationSystemTestUtil + .generateAllocation(0L, 1L, f1), res, minAlloc))); + + ReservationId r2 = ReservationId.newInstance(ts, 2); + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation(r2, null, "u3", + "dedicated", 3, 3 + f1.length, ReservationSystemTestUtil + .generateAllocation(3L, 1L, f1), res, minAlloc))); + + ReservationId r3 = ReservationId.newInstance(ts, 3); + int[] f2 = { 0, 10, 20, 10, 0 }; + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation(r3, null, "u4", + "dedicated", 10, 10 + f2.length, ReservationSystemTestUtil + .generateAllocation(10L, 1L, f2), res, minAlloc))); + + CapacitySchedulerPlanFollower planFollower = + new CapacitySchedulerPlanFollower(); + planFollower.init(mClock, scheduler, Collections.singletonList(plan)); + + when(mClock.getTime()).thenReturn(0L); + planFollower.run(); + + CSQueue defQ = + scheduler.getQueue("dedicated" + PlanQueue.DEFAULT_QUEUE_SUFFIX); + CSQueue q = scheduler.getQueue(r1.toString()); + assertNotNull(q); + // submit an app to r1 + String user_0 = "test-user"; + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId appAttemptId_0 = + ApplicationAttemptId.newInstance(appId, 0); + AppAddedSchedulerEvent addAppEvent = + new AppAddedSchedulerEvent(appId, q.getQueueName(), user_0); + scheduler.handle(addAppEvent); + AppAttemptAddedSchedulerEvent appAttemptAddedEvent = + new AppAttemptAddedSchedulerEvent(appAttemptId_0, false); + scheduler.handle(appAttemptAddedEvent); + + // initial default reservation queue should have no apps + Assert.assertEquals(0, defQ.getNumApplications()); + + Assert.assertEquals(0.1, q.getCapacity(), 0.01); + Assert.assertEquals(0.1, q.getMaximumCapacity(), 1.0); + Assert.assertEquals(1, q.getNumApplications()); + + CSQueue q2 = scheduler.getQueue(r2.toString()); + assertNull(q2); + CSQueue q3 = scheduler.getQueue(r3.toString()); + assertNull(q3); + + when(mClock.getTime()).thenReturn(3L); + planFollower.run(); + + Assert.assertEquals(0, defQ.getNumApplications()); + q = scheduler.getQueue(r1.toString()); + assertNotNull(q); + Assert.assertEquals(0.1, q.getCapacity(), 0.01); + Assert.assertEquals(0.1, q.getMaximumCapacity(), 1.0); + Assert.assertEquals(1, q.getNumApplications()); + q2 = scheduler.getQueue(r2.toString()); + assertNotNull(q2); + Assert.assertEquals(0.1, q.getCapacity(), 0.01); + Assert.assertEquals(0.1, q.getMaximumCapacity(), 1.0); + q3 = scheduler.getQueue(r3.toString()); + assertNull(q3); + + when(mClock.getTime()).thenReturn(10L); + planFollower.run(); + + q = scheduler.getQueue(r1.toString()); + if (isMove) { + // app should have been moved to default reservation queue + Assert.assertEquals(1, defQ.getNumApplications()); + assertNull(q); + } else { + // app should be killed + Assert.assertEquals(0, defQ.getNumApplications()); + assertNotNull(q); + AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent = + new AppAttemptRemovedSchedulerEvent(appAttemptId_0, + RMAppAttemptState.KILLED, false); + scheduler.handle(appAttemptRemovedEvent); + } + q2 = scheduler.getQueue(r2.toString()); + assertNull(q2); + q3 = scheduler.getQueue(r3.toString()); + assertNotNull(q3); + Assert.assertEquals(0, q3.getCapacity(), 0.01); + Assert.assertEquals(1.0, q3.getMaximumCapacity(), 1.0); + + when(mClock.getTime()).thenReturn(11L); + planFollower.run(); + + if (isMove) { + // app should have been moved to default reservation queue + Assert.assertEquals(1, defQ.getNumApplications()); + } else { + // app should be killed + Assert.assertEquals(0, defQ.getNumApplications()); + } + q = scheduler.getQueue(r1.toString()); + assertNull(q); + q2 = scheduler.getQueue(r2.toString()); + assertNull(q2); + q3 = scheduler.getQueue(r3.toString()); + assertNotNull(q3); + Assert.assertEquals(0.1, q3.getCapacity(), 0.01); + Assert.assertEquals(0.1, q3.getMaximumCapacity(), 1.0); + + when(mClock.getTime()).thenReturn(12L); + planFollower.run(); + + q = scheduler.getQueue(r1.toString()); + assertNull(q); + q2 = scheduler.getQueue(r2.toString()); + assertNull(q2); + q3 = scheduler.getQueue(r3.toString()); + assertNotNull(q3); + Assert.assertEquals(0.2, q3.getCapacity(), 0.01); + Assert.assertEquals(0.2, q3.getMaximumCapacity(), 1.0); + + when(mClock.getTime()).thenReturn(16L); + planFollower.run(); + + q = scheduler.getQueue(r1.toString()); + assertNull(q); + q2 = scheduler.getQueue(r2.toString()); + assertNull(q2); + q3 = scheduler.getQueue(r3.toString()); + assertNull(q3); + + assertTrue(defQ.getCapacity() > 0.9); + + } + + public static ApplicationACLsManager mockAppACLsManager() { + Configuration conf = new Configuration(); + return new ApplicationACLsManager(conf); + } + + @After + public void tearDown() throws Exception { + if (scheduler != null) { + scheduler.stop(); + } + } + +}