From f2009dc89cbcb05be972ddd25b109dc72fefa444 Mon Sep 17 00:00:00 2001 From: Arun Suresh Date: Tue, 20 Oct 2015 16:46:14 -0700 Subject: [PATCH] YARN-3985. Make ReservationSystem persist state using RMStateStore reservation APIs. (adhoot via asuresh) (cherry picked from commit 506d1b1dbcb7ae5dad4a3dc4d415af241c72887c) --- hadoop-yarn-project/CHANGES.txt | 2 + .../AbstractReservationSystem.java | 2 +- .../reservation/InMemoryPlan.java | 17 +- .../yarn/server/resourcemanager/MockRM.java | 7 + .../server/resourcemanager/RMHATestBase.java | 16 +- .../resourcemanager/TestClientRMService.java | 24 +- .../TestReservationSystemWithRMHA.java | 212 ++++++++++++++++++ .../ReservationSystemTestUtil.java | 26 +++ .../TestCapacityOverTimePolicy.java | 52 +++-- .../reservation/TestInMemoryPlan.java | 20 +- .../reservation/TestNoOverCommitPolicy.java | 34 ++- .../TestSchedulerPlanFollowerBase.java | 15 +- .../planning/TestAlignedPlanner.java | 9 +- .../planning/TestGreedyReservationAgent.java | 27 ++- .../planning/TestSimpleCapacityReplanner.java | 23 +- 15 files changed, 407 insertions(+), 79 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index fe11b7bdbbe..8afc2d2bfd6 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -470,6 +470,8 @@ Release 2.8.0 - UNRELEASED YARN-4267. Add additional logging to container launch implementations in container-executor. (Sidharta Seethana via vvasudev) + YARN-3985. Make ReservationSystem persist state using RMStateStore + reservation APIs. (adhoot via asuresh) OPTIMIZATIONS diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java index d2603c184d6..cf57dbe5f26 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java @@ -349,7 +349,7 @@ public abstract class AbstractReservationSystem extends AbstractService getAgent(planQueuePath), totCap, planStepSize, rescCalc, minAllocation, maxAllocation, planQueueName, getReplanner(planQueuePath), getReservationSchedulerConfiguration() - .getMoveOnExpiry(planQueuePath)); + .getMoveOnExpiry(planQueuePath), rmContext); LOG.info("Intialized plan {0} based on reservable queue {1}", plan.toString(), planQueueName); return plan; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java index abc9c989e59..a887e24296d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java @@ -32,6 +32,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner; import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; @@ -53,6 +54,7 @@ public class InMemoryPlan implements Plan { private static final Logger LOG = LoggerFactory.getLogger(InMemoryPlan.class); private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0); + private final RMContext rmContext; private TreeMap> currentReservations = new TreeMap>(); @@ -85,15 +87,18 @@ public class InMemoryPlan implements Plan { public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy, ReservationAgent agent, Resource totalCapacity, long step, ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc, - String queueName, Planner replanner, boolean getMoveOnExpiry) { + String queueName, Planner replanner, boolean getMoveOnExpiry, + RMContext rmContext) { this(queueMetrics, policy, agent, totalCapacity, step, resCalc, minAlloc, - maxAlloc, queueName, replanner, getMoveOnExpiry, new UTCClock()); + maxAlloc, queueName, replanner, getMoveOnExpiry, rmContext, + new UTCClock()); } public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy, ReservationAgent agent, Resource totalCapacity, long step, ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc, - String queueName, Planner replanner, boolean getMoveOnExpiry, Clock clock) { + String queueName, Planner replanner, boolean getMoveOnExpiry, + RMContext rmContext, Clock clock) { this.queueMetrics = queueMetrics; this.policy = policy; this.agent = agent; @@ -107,6 +112,7 @@ public class InMemoryPlan implements Plan { this.replanner = replanner; this.getMoveOnExpiry = getMoveOnExpiry; this.clock = clock; + this.rmContext = rmContext; } @Override @@ -211,6 +217,9 @@ public class InMemoryPlan implements Plan { currentReservations.put(searchInterval, reservations); reservationTable.put(inMemReservation.getReservationId(), inMemReservation); + rmContext.getStateStore().storeNewReservation( + ReservationSystemUtil.buildStateProto(inMemReservation), + getQueueName(), inMemReservation.getReservationId().toString()); incrementAllocation(inMemReservation); LOG.info("Sucessfully added reservation: {} to plan.", inMemReservation.getReservationId()); @@ -289,6 +298,8 @@ public class InMemoryPlan implements Plan { throw new IllegalArgumentException(errMsg); } reservationTable.remove(reservation.getReservationId()); + rmContext.getStateStore().removeReservation( + getQueueName(), reservation.getReservationId().toString()); decrementAllocation(reservation); LOG.info("Sucessfully deleted reservation: {} in plan.", reservation.getReservationId()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index a0619cf0600..6923dd223fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; @@ -815,6 +816,12 @@ public class MockRM extends ResourceManager { return response.getApplicationReport(); } + public void updateReservationState(ReservationUpdateRequest request) + throws IOException, YarnException { + ApplicationClientProtocol client = getClientRMService(); + client.updateReservation(request); + } + // Explicitly reset queue metrics for testing. @SuppressWarnings("static-access") public void clearQueueMetrics(RMApp app) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java index 40b59ba5a75..6092f41705f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java @@ -29,6 +29,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -106,8 +108,18 @@ public abstract class RMHATestBase extends ClientBaseWithFixes{ } protected void startRMs() throws IOException { - rm1 = new MockRM(confForRM1, null, false); - rm2 = new MockRM(confForRM2, null, false); + rm1 = new MockRM(confForRM1, null, false){ + @Override + protected Dispatcher createDispatcher() { + return new DrainDispatcher(); + } + }; + rm2 = new MockRM(confForRM2, null, false){ + @Override + protected Dispatcher createDispatcher() { + return new DrainDispatcher(); + } + }; startRMs(rm1, confForRM1, rm2, confForRM2); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index 49b5b550c2e..a7219fa8772 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -32,7 +32,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -103,8 +102,6 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationRequest; -import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; -import org.apache.hadoop.yarn.api.records.ReservationRequests; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -1115,7 +1112,8 @@ public class TestClientRMService { long duration = 60000; long deadline = (long) (arrival + 1.05 * duration); ReservationSubmissionRequest sRequest = - createSimpleReservationRequest(4, arrival, deadline, duration); + ReservationSystemTestUtil.createSimpleReservationRequest(4, arrival, + deadline, duration); ReservationSubmissionResponse sResponse = null; try { sResponse = clientService.submitReservation(sRequest); @@ -1167,24 +1165,6 @@ public class TestClientRMService { rm = null; } - private ReservationSubmissionRequest createSimpleReservationRequest( - int numContainers, long arrival, long deadline, long duration) { - // create a request with a single atomic ask - ReservationRequest r = - ReservationRequest.newInstance(Resource.newInstance(1024, 1), - numContainers, 1, duration); - ReservationRequests reqs = - ReservationRequests.newInstance(Collections.singletonList(r), - ReservationRequestInterpreter.R_ALL); - ReservationDefinition rDef = - ReservationDefinition.newInstance(arrival, deadline, reqs, - "testClientRMService#reservation"); - ReservationSubmissionRequest request = - ReservationSubmissionRequest.newInstance(rDef, - ReservationSystemTestUtil.reservationQ); - return request; - } - @Test public void testGetNodeLabels() throws Exception { MockRM rm = new MockRM() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java new file mode 100644 index 00000000000..6f7413070b9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.UTCClock; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; + +public class TestReservationSystemWithRMHA extends RMHATestBase{ + + @Override + public void setup() throws Exception { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + ReservationSystemTestUtil.setupQueueConfiguration(conf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true); + configuration = conf; + + super.setup(); + } + + @Test + public void testSubmitReservationAndCheckAfterFailover() throws Exception { + startRMs(); + + addNodeCapacityToPlan(); + + ClientRMService clientService = rm1.getClientRMService(); + + // create a reservation + ReservationSubmissionRequest request = createReservationSubmissionRequest(); + ReservationSubmissionResponse response = null; + try { + response = clientService.submitReservation(request); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + Assert.assertNotNull(response); + ReservationId reservationID = response.getReservationId(); + Assert.assertNotNull(reservationID); + LOG.info("Submit reservation response: " + reservationID); + ReservationDefinition reservationDefinition = request + .getReservationDefinition(); + + // Do the failover + explicitFailover(); + + rm2.registerNode("127.0.0.1:1", 102400, 100); + + RMState state = rm2.getRMContext().getStateStore().loadState(); + Map reservationStateMap = + state.getReservationState().get(ReservationSystemTestUtil.reservationQ); + Assert.assertNotNull(reservationStateMap); + Assert.assertNotNull(reservationStateMap.get(reservationID)); + } + + + @Test + public void testUpdateReservationAndCheckAfterFailover() throws Exception { + startRMs(); + + addNodeCapacityToPlan(); + + ClientRMService clientService = rm1.getClientRMService(); + + // create a reservation + ReservationSubmissionRequest request = createReservationSubmissionRequest(); + ReservationSubmissionResponse response = null; + try { + response = clientService.submitReservation(request); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + Assert.assertNotNull(response); + ReservationId reservationID = response.getReservationId(); + Assert.assertNotNull(reservationID); + LOG.info("Submit reservation response: " + reservationID); + ReservationDefinition reservationDefinition = request + .getReservationDefinition(); + + + // Change any field + + long newDeadline = reservationDefinition.getDeadline() + 100; + reservationDefinition.setDeadline(newDeadline); + ReservationUpdateRequest updateRequest = + ReservationUpdateRequest.newInstance( + reservationDefinition, reservationID); + rm1.updateReservationState(updateRequest); + + // Do the failover + explicitFailover(); + + rm2.registerNode("127.0.0.1:1", 102400, 100); + + RMState state = rm2.getRMContext().getStateStore().loadState(); + Map reservationStateMap = + state.getReservationState().get(ReservationSystemTestUtil.reservationQ); + Assert.assertNotNull(reservationStateMap); + ReservationAllocationStateProto reservationState = + reservationStateMap.get(reservationID); + Assert.assertEquals(newDeadline, + reservationState.getReservationDefinition().getDeadline()); + } + + @Test + public void testDeleteReservationAndCheckAfterFailover() throws Exception { + startRMs(); + + addNodeCapacityToPlan(); + + ClientRMService clientService = rm1.getClientRMService(); + + // create a reservation + ReservationSubmissionRequest request = createReservationSubmissionRequest(); + ReservationSubmissionResponse response = null; + try { + response = clientService.submitReservation(request); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + Assert.assertNotNull(response); + ReservationId reservationID = response.getReservationId(); + Assert.assertNotNull(reservationID); + + + // Delete the reservation + ReservationDeleteRequest deleteRequest = + ReservationDeleteRequest.newInstance(reservationID); + clientService.deleteReservation(deleteRequest); + + // Do the failover + explicitFailover(); + + rm2.registerNode("127.0.0.1:1", 102400, 100); + + RMState state = rm2.getRMContext().getStateStore().loadState(); + Assert.assertNull(state.getReservationState().get( + ReservationSystemTestUtil.reservationQ)); + } + + private void addNodeCapacityToPlan() { + try { + rm1.registerNode("127.0.0.1:1", 102400, 100); + int attempts = 10; + do { + DrainDispatcher dispatcher = + (DrainDispatcher) rm1.getRMContext().getDispatcher(); + dispatcher.await(); + rm1.getRMContext().getReservationSystem().synchronizePlan( + ReservationSystemTestUtil.reservationQ); + if (rm1.getRMContext().getReservationSystem().getPlan + (ReservationSystemTestUtil.reservationQ).getTotalCapacity() + .getMemory() > 0) { + break; + } + LOG.info("Waiting for node capacity to be added to plan"); + Thread.sleep(100); + } + while (attempts-- > 0); + if (attempts <= 0) { + Assert.fail("Exhausted attempts in checking if node capacity was " + + "added to the plan"); + } + + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + private ReservationSubmissionRequest createReservationSubmissionRequest() { + Clock clock = new UTCClock(); + long arrival = clock.getTime(); + long duration = 60000; + long deadline = (long) (arrival + 1.05 * duration); + return ReservationSystemTestUtil.createSimpleReservationRequest(4, arrival, + deadline, duration); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java index 954023b2e00..77291727c36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java @@ -32,6 +32,7 @@ import java.util.Random; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationRequest; @@ -45,6 +46,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.AlignedPlannerWithGreedy; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; @@ -189,6 +191,30 @@ public class ReservationSystemTestUtil { return rDef; } + public static ReservationSubmissionRequest createSimpleReservationRequest( + int numContainers, long arrival, long deadline, long duration) { + // create a request with a single atomic ask + ReservationRequest r = + ReservationRequest.newInstance(Resource.newInstance(1024, 1), + numContainers, 1, duration); + ReservationRequests reqs = + ReservationRequests.newInstance(Collections.singletonList(r), + ReservationRequestInterpreter.R_ALL); + ReservationDefinition rDef = + ReservationDefinition.newInstance(arrival, deadline, reqs, + "testClientRMService#reservation"); + ReservationSubmissionRequest request = + ReservationSubmissionRequest.newInstance(rDef, + reservationQ); + return request; + } + + public static RMContext createMockRMContext() { + RMContext context = mock(RMContext.class); + when(context.getStateStore()).thenReturn(new MemoryRMStateStore()); + return context; + } + @SuppressWarnings("unchecked") public CapacityScheduler mockCapacityScheduler(int numContainers) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java index f608c3ba01f..22ce6aa4dbe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java @@ -24,8 +24,10 @@ import java.io.IOException; import java.util.Map; import java.util.TreeMap; +import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationRequest; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException; @@ -82,11 +84,12 @@ public class TestCapacityOverTimePolicy { instConstraint, avgConstraint); CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); policy.init(reservationQ, conf); + RMContext context = ReservationSystemTestUtil.createMockRMContext(); plan = new InMemoryPlan(rootQueueMetrics, policy, mAgent, clusterResource, step, res, minAlloc, maxAlloc, - "dedicated", null, true); + "dedicated", null, true, context); } public int[] generateData(int length, int val) { @@ -101,9 +104,13 @@ public class TestCapacityOverTimePolicy { public void testSimplePass() throws IOException, PlanningException { // generate allocation that simply fit within all constraints int[] f = generateData(3600, (int) Math.ceil(0.2 * totCont)); + + ReservationDefinition rDef = + ReservationSystemTestUtil.createSimpleReservationDefinition( + initTime, initTime + f.length + 1, f.length); assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), null, "u1", + ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil.generateAllocation(initTime, step, f), res, minAlloc))); @@ -115,9 +122,12 @@ public class TestCapacityOverTimePolicy { // fit within // max instantanesou int[] f = generateData(3600, (int) Math.ceil(0.69 * totCont)); + ReservationDefinition rDef = + ReservationSystemTestUtil.createSimpleReservationDefinition( + initTime, initTime + f.length + 1, f.length); assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), null, "u1", + ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil.generateAllocation(initTime, step, f), res, minAlloc))); @@ -127,10 +137,13 @@ public class TestCapacityOverTimePolicy { public void testMultiTenantPass() throws IOException, PlanningException { // generate allocation from multiple tenants that barely fit in tot capacity int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont)); + ReservationDefinition rDef = + ReservationSystemTestUtil.createSimpleReservationDefinition( + initTime, initTime + f.length + 1, f.length); for (int i = 0; i < 4; i++) { assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), null, "u" + i, + ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i, "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil.generateAllocation(initTime, step, f), res, minAlloc))); @@ -141,10 +154,13 @@ public class TestCapacityOverTimePolicy { public void testMultiTenantFail() throws IOException, PlanningException { // generate allocation from multiple tenants that exceed tot capacity int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont)); + ReservationDefinition rDef = + ReservationSystemTestUtil.createSimpleReservationDefinition( + initTime, initTime + f.length + 1, f.length); for (int i = 0; i < 5; i++) { assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), null, "u" + i, + ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i, "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil.generateAllocation(initTime, step, f), res, minAlloc))); @@ -155,9 +171,12 @@ public class TestCapacityOverTimePolicy { public void testInstFail() throws IOException, PlanningException { // generate allocation that exceed the instantaneous cap single-show int[] f = generateData(3600, (int) Math.ceil(0.71 * totCont)); + ReservationDefinition rDef = + ReservationSystemTestUtil.createSimpleReservationDefinition( + initTime, initTime + f.length + 1, f.length); assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), null, "u1", + ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil.generateAllocation(initTime, step, f), res, minAlloc))); @@ -168,23 +187,25 @@ public class TestCapacityOverTimePolicy { public void testInstFailBySum() throws IOException, PlanningException { // generate allocation that exceed the instantaneous cap by sum int[] f = generateData(3600, (int) Math.ceil(0.3 * totCont)); - + ReservationDefinition rDef = + ReservationSystemTestUtil.createSimpleReservationDefinition( + initTime, initTime + f.length + 1, f.length); assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), null, "u1", + ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil.generateAllocation(initTime, step, f), res, minAlloc))); assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), null, "u1", + ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil.generateAllocation(initTime, step, f), res, minAlloc))); try { assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), null, "u1", + ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil.generateAllocation(initTime, step, f), res, minAlloc))); @@ -205,10 +226,12 @@ public class TestCapacityOverTimePolicy { ReservationSystemUtil.toResource( ReservationRequest.newInstance(Resource.newInstance(1024, 1), cont))); - + ReservationDefinition rDef = + ReservationSystemTestUtil.createSimpleReservationDefinition( + initTime, initTime + win, win); assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), null, "u1", + ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", "dedicated", initTime, initTime + win, req, res, minAlloc))); } @@ -222,9 +245,12 @@ public class TestCapacityOverTimePolicy { req.put(new ReservationInterval(initTime, initTime + win), ReservationSystemUtil.toResource(ReservationRequest.newInstance(Resource .newInstance(1024, 1), cont))); + ReservationDefinition rDef = + ReservationSystemTestUtil.createSimpleReservationDefinition( + initTime, initTime + win, win); assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), null, "u1", + ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", "dedicated", initTime, initTime + win, req, res, minAlloc))); try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java index b6d24b66b53..c6612710546 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ReservationRequests; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner; import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; @@ -59,6 +60,7 @@ public class TestInMemoryPlan { private SharingPolicy policy; private ReservationAgent agent; private Planner replanner; + private RMContext context; @Before public void setUp() throws PlanningException { @@ -73,6 +75,8 @@ public class TestInMemoryPlan { replanner = mock(Planner.class); when(clock.getTime()).thenReturn(1L); + + context = ReservationSystemTestUtil.createMockRMContext(); } @After @@ -92,7 +96,7 @@ public class TestInMemoryPlan { public void testAddReservation() { Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L, - resCalc, minAlloc, maxAlloc, planName, replanner, true); + resCalc, minAlloc, maxAlloc, planName, replanner, true, context); ReservationId reservationID = ReservationSystemTestUtil.getNewReservationId(); int[] alloc = { 10, 10, 10, 10, 10, 10 }; @@ -126,7 +130,7 @@ public class TestInMemoryPlan { public void testAddEmptyReservation() { Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L, - resCalc, minAlloc, maxAlloc, planName, replanner, true); + resCalc, minAlloc, maxAlloc, planName, replanner, true, context); ReservationId reservationID = ReservationSystemTestUtil.getNewReservationId(); int[] alloc = {}; @@ -154,7 +158,7 @@ public class TestInMemoryPlan { // First add a reservation Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L, - resCalc, minAlloc, maxAlloc, planName, replanner, true); + resCalc, minAlloc, maxAlloc, planName, replanner, true, context); ReservationId reservationID = ReservationSystemTestUtil.getNewReservationId(); int[] alloc = { 10, 10, 10, 10, 10, 10 }; @@ -199,7 +203,7 @@ public class TestInMemoryPlan { public void testUpdateReservation() { Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L, - resCalc, minAlloc, maxAlloc, planName, replanner, true); + resCalc, minAlloc, maxAlloc, planName, replanner, true, context); ReservationId reservationID = ReservationSystemTestUtil.getNewReservationId(); // First add a reservation @@ -262,7 +266,7 @@ public class TestInMemoryPlan { public void testUpdateNonExistingReservation() { Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L, - resCalc, minAlloc, maxAlloc, planName, replanner, true); + resCalc, minAlloc, maxAlloc, planName, replanner, true, context); ReservationId reservationID = ReservationSystemTestUtil.getNewReservationId(); // Try to update a reservation without adding @@ -295,7 +299,7 @@ public class TestInMemoryPlan { // First add a reservation Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L, - resCalc, minAlloc, maxAlloc, planName, replanner, true); + resCalc, minAlloc, maxAlloc, planName, replanner, true, context); ReservationId reservationID = ReservationSystemTestUtil.getNewReservationId(); int[] alloc = { 10, 10, 10, 10, 10, 10 }; @@ -345,7 +349,7 @@ public class TestInMemoryPlan { public void testDeleteNonExistingReservation() { Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L, - resCalc, minAlloc, maxAlloc, planName, replanner, true); + resCalc, minAlloc, maxAlloc, planName, replanner, true, context); ReservationId reservationID = ReservationSystemTestUtil.getNewReservationId(); // Try to delete a reservation without adding @@ -365,7 +369,7 @@ public class TestInMemoryPlan { public void testArchiveCompletedReservations() { Plan plan = new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L, - resCalc, minAlloc, maxAlloc, planName, replanner, true); + resCalc, minAlloc, maxAlloc, planName, replanner, true, context); ReservationId reservationID1 = ReservationSystemTestUtil.getNewReservationId(); // First add a reservation diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java index 809892c06c1..a5f3fb4df67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java @@ -22,8 +22,10 @@ import static org.mockito.Mockito.mock; import java.io.IOException; +import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException; @@ -67,11 +69,12 @@ public class TestNoOverCommitPolicy { (ReservationSchedulerConfiguration.class); NoOverCommitPolicy policy = new NoOverCommitPolicy(); policy.init(reservationQ, conf); + RMContext context = ReservationSystemTestUtil.createMockRMContext(); plan = new InMemoryPlan(rootQueueMetrics, policy, mAgent, clusterResource, step, res, minAlloc, maxAlloc, - "dedicated", null, true); + "dedicated", null, true, context); } public int[] generateData(int length, int val) { @@ -86,9 +89,12 @@ public class TestNoOverCommitPolicy { public void testSingleUserEasyFitPass() throws IOException, PlanningException { // generate allocation that easily fit within resource constraints int[] f = generateData(3600, (int) Math.ceil(0.2 * totCont)); + ReservationDefinition rDef = + ReservationSystemTestUtil.createSimpleReservationDefinition( + initTime, initTime + f.length + 1, f.length); assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), null, "u1", + ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil.generateAllocation(initTime, step, f), res, minAlloc))); @@ -99,9 +105,12 @@ public class TestNoOverCommitPolicy { PlanningException { // generate allocation from single tenant that barely fit int[] f = generateData(3600, totCont); + ReservationDefinition rDef = + ReservationSystemTestUtil.createSimpleReservationDefinition( + initTime, initTime + f.length + 1, f.length); assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), null, "u1", + ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil.generateAllocation(initTime, step, f), res, minAlloc))); @@ -121,14 +130,17 @@ public class TestNoOverCommitPolicy { public void testUserMismatch() throws IOException, PlanningException { // generate allocation from single tenant that exceed capacity int[] f = generateData(3600, (int) (0.5 * totCont)); - + ReservationDefinition rDef = + ReservationSystemTestUtil.createSimpleReservationDefinition( + initTime, initTime + f.length + 1, f.length); ReservationId rid = ReservationSystemTestUtil.getNewReservationId(); - plan.addReservation(new InMemoryReservationAllocation(rid, null, "u1", + + plan.addReservation(new InMemoryReservationAllocation(rid, rDef, "u1", "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil .generateAllocation(initTime, step, f), res, minAlloc)); // trying to update a reservation with a mismatching user - plan.updateReservation(new InMemoryReservationAllocation(rid, null, "u2", + plan.updateReservation(new InMemoryReservationAllocation(rid, rDef, "u2", "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil .generateAllocation(initTime, step, f), res, minAlloc)); } @@ -137,10 +149,13 @@ public class TestNoOverCommitPolicy { public void testMultiTenantPass() throws IOException, PlanningException { // generate allocation from multiple tenants that barely fit in tot capacity int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont)); + ReservationDefinition rDef = + ReservationSystemTestUtil.createSimpleReservationDefinition( + initTime, initTime + f.length + 1, f.length); for (int i = 0; i < 4; i++) { assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), null, "u" + i, + ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i, "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil.generateAllocation(initTime, step, f), res, minAlloc))); @@ -151,10 +166,13 @@ public class TestNoOverCommitPolicy { public void testMultiTenantFail() throws IOException, PlanningException { // generate allocation from multiple tenants that exceed tot capacity int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont)); + ReservationDefinition rDef = + ReservationSystemTestUtil.createSimpleReservationDefinition( + initTime, initTime + f.length + 1, f.length); for (int i = 0; i < 5; i++) { assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), null, "u" + i, + ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i, "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil.generateAllocation(initTime, step, f), res, minAlloc))); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java index f5625fb27be..9689fce5ca5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java @@ -21,8 +21,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -39,6 +41,7 @@ import org.junit.Assert; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public abstract class TestSchedulerPlanFollowerBase { @@ -51,6 +54,7 @@ public abstract class TestSchedulerPlanFollowerBase { protected CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); protected Plan plan; private ResourceCalculator res = new DefaultResourceCalculator(); + private RMContext context = ReservationSystemTestUtil.createMockRMContext(); protected void testPlanFollower(boolean isMove) throws PlanningException, InterruptedException, AccessControlException { @@ -59,27 +63,30 @@ public abstract class TestSchedulerPlanFollowerBase { new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent, scheduler.getClusterResource(), 1L, res, scheduler.getMinimumResourceCapability(), maxAlloc, "dedicated", - null, isMove); + null, isMove, context); // add a few reservations to the plan long ts = System.currentTimeMillis(); ReservationId r1 = ReservationId.newInstance(ts, 1); int[] f1 = { 10, 10, 10, 10, 10 }; + ReservationDefinition rDef = + ReservationSystemTestUtil.createSimpleReservationDefinition( + 0, 0 + f1.length + 1, f1.length); assertTrue(plan.toString(), - plan.addReservation(new InMemoryReservationAllocation(r1, null, "u3", + plan.addReservation(new InMemoryReservationAllocation(r1, rDef, "u3", "dedicated", 0, 0 + f1.length, ReservationSystemTestUtil .generateAllocation(0L, 1L, f1), res, minAlloc))); ReservationId r2 = ReservationId.newInstance(ts, 2); assertTrue(plan.toString(), - plan.addReservation(new InMemoryReservationAllocation(r2, null, "u3", + plan.addReservation(new InMemoryReservationAllocation(r2, rDef, "u3", "dedicated", 3, 3 + f1.length, ReservationSystemTestUtil .generateAllocation(3L, 1L, f1), res, minAlloc))); ReservationId r3 = ReservationId.newInstance(ts, 3); int[] f2 = { 0, 10, 20, 10, 0 }; assertTrue(plan.toString(), - plan.addReservation(new InMemoryReservationAllocation(r3, null, "u4", + plan.addReservation(new InMemoryReservationAllocation(r3, rDef, "u4", "dedicated", 10, 10 + f2.length, ReservationSystemTestUtil .generateAllocation(10L, 1L, f2), res, minAlloc))); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java index 9a1621a7aaa..01b7976434d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ReservationRequest; import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; import org.apache.hadoop.yarn.api.records.ReservationRequests; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy; import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan; import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation; @@ -724,6 +725,7 @@ public class TestAlignedPlanner { policy.init(reservationQ, conf); QueueMetrics queueMetrics = mock(QueueMetrics.class); + RMContext context = ReservationSystemTestUtil.createMockRMContext(); // Set planning agent agent = new AlignedPlannerWithGreedy(); @@ -731,7 +733,7 @@ public class TestAlignedPlanner { // Create Plan plan = new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step, - res, minAlloc, maxAlloc, "dedicated", null, true); + res, minAlloc, maxAlloc, "dedicated", null, true, context); } private int initializeScenario1() throws PlanningException { @@ -782,9 +784,12 @@ public class TestAlignedPlanner { private void addFixedAllocation(long start, long step, int[] f) throws PlanningException { + ReservationDefinition rDef = + ReservationSystemTestUtil.createSimpleReservationDefinition( + start, start + f.length * step, f.length * step); assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), null, + ReservationSystemTestUtil.getNewReservationId(), rDef, "user_fixed", "dedicated", start, start + f.length * step, ReservationSystemTestUtil.generateAllocation(start, step, f), res, minAlloc))); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java index bd18a2f4543..f51cc75c3de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ReservationRequests; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy; import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan; import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation; @@ -89,9 +90,10 @@ public class TestGreedyReservationAgent { agent = new GreedyReservationAgent(); QueueMetrics queueMetrics = mock(QueueMetrics.class); + RMContext context = ReservationSystemTestUtil.createMockRMContext(); plan = new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step, - res, minAlloc, maxAlloc, "dedicated", null, true); + res, minAlloc, maxAlloc, "dedicated", null, true, context); } @SuppressWarnings("javadoc") @@ -141,9 +143,12 @@ public class TestGreedyReservationAgent { // create a completely utilized segment around time 30 int[] f = { 100, 100 }; + ReservationDefinition rDef = + ReservationSystemTestUtil.createSimpleReservationDefinition( + 30 * step, 30 * step + f.length * step, f.length * step); assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), null, "u1", + ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", "dedicated", 30 * step, 30 * step + f.length * step, ReservationSystemTestUtil.generateAllocation(30 * step, step, f), res, minAlloc))); @@ -195,10 +200,12 @@ public class TestGreedyReservationAgent { prepareBasicPlan(); // create a completely utilized segment at time 30 int[] f = { 100, 100 }; - + ReservationDefinition rDef = + ReservationSystemTestUtil.createSimpleReservationDefinition( + 30, 30 * step + f.length * step, f.length * step); assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), null, "u1", + ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", "dedicated", 30 * step, 30 * step + f.length * step, ReservationSystemTestUtil.generateAllocation(30 * step, step, f), res, minAlloc))); @@ -515,10 +522,12 @@ public class TestGreedyReservationAgent { // conditions for assignment that are non-empty int[] f = { 10, 10, 20, 20, 20, 10, 10 }; - + ReservationDefinition rDef = + ReservationSystemTestUtil.createSimpleReservationDefinition( + 0, 0 + f.length * step, f.length * step); assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), null, "u1", + ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", "dedicated", 0L, 0L + f.length * step, ReservationSystemTestUtil .generateAllocation(0, step, f), res, minAlloc))); @@ -527,7 +536,7 @@ public class TestGreedyReservationAgent { ReservationSystemTestUtil.generateAllocation(5000, step, f2); assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), null, "u1", + ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", "dedicated", 5000, 5000 + f2.length * step, alloc, res, minAlloc))); System.out.println("--------BEFORE AGENT----------"); @@ -562,9 +571,11 @@ public class TestGreedyReservationAgent { instConstraint, avgConstraint); CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); policy.init(reservationQ, conf); + RMContext context = ReservationSystemTestUtil.createMockRMContext(); plan = new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, agent, - clusterCapacity, step, res, minAlloc, maxAlloc, "dedicated", null, true); + clusterCapacity, step, res, minAlloc, maxAlloc, "dedicated", null, + true, context); int acc = 0; List list = new ArrayList(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java index aeb1e6a760c..b6e6667f924 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java @@ -27,15 +27,18 @@ import static org.mockito.Mockito.when; import java.util.Map; import java.util.TreeMap; +import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationRequest; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan; import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation; import org.apache.hadoop.yarn.server.resourcemanager.reservation.NoOverCommitPolicy; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemUtil; import org.apache.hadoop.yarn.server.resourcemanager.reservation.SharingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; @@ -67,6 +70,7 @@ public class TestSimpleCapacityReplanner { when(clock.getTime()).thenReturn(0L); SimpleCapacityReplanner enf = new SimpleCapacityReplanner(clock); + RMContext context = ReservationSystemTestUtil.createMockRMContext(); ReservationSchedulerConfiguration conf = mock(ReservationSchedulerConfiguration.class); when(conf.getEnforcementWindow(any(String.class))).thenReturn(6L); @@ -76,52 +80,55 @@ public class TestSimpleCapacityReplanner { // Initialize the plan with more resources InMemoryPlan plan = new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step, - res, minAlloc, maxAlloc, "dedicated", enf, true, clock); + res, minAlloc, maxAlloc, "dedicated", enf, true, context, clock); // add reservation filling the plan (separating them 1ms, so we are sure // s2 follows s1 on acceptance long ts = System.currentTimeMillis(); ReservationId r1 = ReservationId.newInstance(ts, 1); int[] f5 = { 20, 20, 20, 20, 20 }; + ReservationDefinition rDef = + ReservationSystemTestUtil.createSimpleReservationDefinition( + 0, 0 + f5.length, f5.length); assertTrue(plan.toString(), - plan.addReservation(new InMemoryReservationAllocation(r1, null, "u3", + plan.addReservation(new InMemoryReservationAllocation(r1, rDef, "u3", "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res, minAlloc))); when(clock.getTime()).thenReturn(1L); ReservationId r2 = ReservationId.newInstance(ts, 2); assertTrue(plan.toString(), - plan.addReservation(new InMemoryReservationAllocation(r2, null, "u4", + plan.addReservation(new InMemoryReservationAllocation(r2, rDef, "u4", "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res, minAlloc))); when(clock.getTime()).thenReturn(2L); ReservationId r3 = ReservationId.newInstance(ts, 3); assertTrue(plan.toString(), - plan.addReservation(new InMemoryReservationAllocation(r3, null, "u5", + plan.addReservation(new InMemoryReservationAllocation(r3, rDef, "u5", "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res, minAlloc))); when(clock.getTime()).thenReturn(3L); ReservationId r4 = ReservationId.newInstance(ts, 4); assertTrue(plan.toString(), - plan.addReservation(new InMemoryReservationAllocation(r4, null, "u6", + plan.addReservation(new InMemoryReservationAllocation(r4, rDef, "u6", "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res, minAlloc))); when(clock.getTime()).thenReturn(4L); ReservationId r5 = ReservationId.newInstance(ts, 5); assertTrue(plan.toString(), - plan.addReservation(new InMemoryReservationAllocation(r5, null, "u7", + plan.addReservation(new InMemoryReservationAllocation(r5, rDef, "u7", "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res, minAlloc))); int[] f6 = { 50, 50, 50, 50, 50 }; ReservationId r6 = ReservationId.newInstance(ts, 6); assertTrue(plan.toString(), - plan.addReservation(new InMemoryReservationAllocation(r6, null, "u3", + plan.addReservation(new InMemoryReservationAllocation(r6, rDef, "u3", "dedicated", 10, 10 + f6.length, generateAllocation(10, f6), res, minAlloc))); when(clock.getTime()).thenReturn(6L); ReservationId r7 = ReservationId.newInstance(ts, 7); assertTrue(plan.toString(), - plan.addReservation(new InMemoryReservationAllocation(r7, null, "u4", + plan.addReservation(new InMemoryReservationAllocation(r7, rDef, "u4", "dedicated", 10, 10 + f6.length, generateAllocation(10, f6), res, minAlloc)));