YARN-3985. Make ReservationSystem persist state using RMStateStore reservation APIs. (adhoot via asuresh)
(cherry picked from commit 506d1b1dbc
)
This commit is contained in:
parent
e327233e80
commit
f2009dc89c
|
@ -470,6 +470,8 @@ Release 2.8.0 - UNRELEASED
|
||||||
YARN-4267. Add additional logging to container launch implementations in
|
YARN-4267. Add additional logging to container launch implementations in
|
||||||
container-executor. (Sidharta Seethana via vvasudev)
|
container-executor. (Sidharta Seethana via vvasudev)
|
||||||
|
|
||||||
|
YARN-3985. Make ReservationSystem persist state using RMStateStore
|
||||||
|
reservation APIs. (adhoot via asuresh)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
|
|
|
@ -349,7 +349,7 @@ public abstract class AbstractReservationSystem extends AbstractService
|
||||||
getAgent(planQueuePath), totCap, planStepSize, rescCalc,
|
getAgent(planQueuePath), totCap, planStepSize, rescCalc,
|
||||||
minAllocation, maxAllocation, planQueueName,
|
minAllocation, maxAllocation, planQueueName,
|
||||||
getReplanner(planQueuePath), getReservationSchedulerConfiguration()
|
getReplanner(planQueuePath), getReservationSchedulerConfiguration()
|
||||||
.getMoveOnExpiry(planQueuePath));
|
.getMoveOnExpiry(planQueuePath), rmContext);
|
||||||
LOG.info("Intialized plan {0} based on reservable queue {1}",
|
LOG.info("Intialized plan {0} based on reservable queue {1}",
|
||||||
plan.toString(), planQueueName);
|
plan.toString(), planQueueName);
|
||||||
return plan;
|
return plan;
|
||||||
|
|
|
@ -32,6 +32,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.ReservationId;
|
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
|
||||||
|
@ -53,6 +54,7 @@ public class InMemoryPlan implements Plan {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(InMemoryPlan.class);
|
private static final Logger LOG = LoggerFactory.getLogger(InMemoryPlan.class);
|
||||||
|
|
||||||
private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
|
private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
|
||||||
|
private final RMContext rmContext;
|
||||||
|
|
||||||
private TreeMap<ReservationInterval, Set<InMemoryReservationAllocation>> currentReservations =
|
private TreeMap<ReservationInterval, Set<InMemoryReservationAllocation>> currentReservations =
|
||||||
new TreeMap<ReservationInterval, Set<InMemoryReservationAllocation>>();
|
new TreeMap<ReservationInterval, Set<InMemoryReservationAllocation>>();
|
||||||
|
@ -85,15 +87,18 @@ public class InMemoryPlan implements Plan {
|
||||||
public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
|
public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
|
||||||
ReservationAgent agent, Resource totalCapacity, long step,
|
ReservationAgent agent, Resource totalCapacity, long step,
|
||||||
ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
|
ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
|
||||||
String queueName, Planner replanner, boolean getMoveOnExpiry) {
|
String queueName, Planner replanner, boolean getMoveOnExpiry,
|
||||||
|
RMContext rmContext) {
|
||||||
this(queueMetrics, policy, agent, totalCapacity, step, resCalc, minAlloc,
|
this(queueMetrics, policy, agent, totalCapacity, step, resCalc, minAlloc,
|
||||||
maxAlloc, queueName, replanner, getMoveOnExpiry, new UTCClock());
|
maxAlloc, queueName, replanner, getMoveOnExpiry, rmContext,
|
||||||
|
new UTCClock());
|
||||||
}
|
}
|
||||||
|
|
||||||
public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
|
public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
|
||||||
ReservationAgent agent, Resource totalCapacity, long step,
|
ReservationAgent agent, Resource totalCapacity, long step,
|
||||||
ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
|
ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
|
||||||
String queueName, Planner replanner, boolean getMoveOnExpiry, Clock clock) {
|
String queueName, Planner replanner, boolean getMoveOnExpiry,
|
||||||
|
RMContext rmContext, Clock clock) {
|
||||||
this.queueMetrics = queueMetrics;
|
this.queueMetrics = queueMetrics;
|
||||||
this.policy = policy;
|
this.policy = policy;
|
||||||
this.agent = agent;
|
this.agent = agent;
|
||||||
|
@ -107,6 +112,7 @@ public class InMemoryPlan implements Plan {
|
||||||
this.replanner = replanner;
|
this.replanner = replanner;
|
||||||
this.getMoveOnExpiry = getMoveOnExpiry;
|
this.getMoveOnExpiry = getMoveOnExpiry;
|
||||||
this.clock = clock;
|
this.clock = clock;
|
||||||
|
this.rmContext = rmContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -211,6 +217,9 @@ public class InMemoryPlan implements Plan {
|
||||||
currentReservations.put(searchInterval, reservations);
|
currentReservations.put(searchInterval, reservations);
|
||||||
reservationTable.put(inMemReservation.getReservationId(),
|
reservationTable.put(inMemReservation.getReservationId(),
|
||||||
inMemReservation);
|
inMemReservation);
|
||||||
|
rmContext.getStateStore().storeNewReservation(
|
||||||
|
ReservationSystemUtil.buildStateProto(inMemReservation),
|
||||||
|
getQueueName(), inMemReservation.getReservationId().toString());
|
||||||
incrementAllocation(inMemReservation);
|
incrementAllocation(inMemReservation);
|
||||||
LOG.info("Sucessfully added reservation: {} to plan.",
|
LOG.info("Sucessfully added reservation: {} to plan.",
|
||||||
inMemReservation.getReservationId());
|
inMemReservation.getReservationId());
|
||||||
|
@ -289,6 +298,8 @@ public class InMemoryPlan implements Plan {
|
||||||
throw new IllegalArgumentException(errMsg);
|
throw new IllegalArgumentException(errMsg);
|
||||||
}
|
}
|
||||||
reservationTable.remove(reservation.getReservationId());
|
reservationTable.remove(reservation.getReservationId());
|
||||||
|
rmContext.getStateStore().removeReservation(
|
||||||
|
getQueueName(), reservation.getReservationId().toString());
|
||||||
decrementAllocation(reservation);
|
decrementAllocation(reservation);
|
||||||
LOG.info("Sucessfully deleted reservation: {} in plan.",
|
LOG.info("Sucessfully deleted reservation: {} in plan.",
|
||||||
reservation.getReservationId());
|
reservation.getReservationId());
|
||||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
||||||
|
@ -815,6 +816,12 @@ public class MockRM extends ResourceManager {
|
||||||
return response.getApplicationReport();
|
return response.getApplicationReport();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void updateReservationState(ReservationUpdateRequest request)
|
||||||
|
throws IOException, YarnException {
|
||||||
|
ApplicationClientProtocol client = getClientRMService();
|
||||||
|
client.updateReservation(request);
|
||||||
|
}
|
||||||
|
|
||||||
// Explicitly reset queue metrics for testing.
|
// Explicitly reset queue metrics for testing.
|
||||||
@SuppressWarnings("static-access")
|
@SuppressWarnings("static-access")
|
||||||
public void clearQueueMetrics(RMApp app) {
|
public void clearQueueMetrics(RMApp app) {
|
||||||
|
|
|
@ -29,6 +29,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
@ -106,8 +108,18 @@ public abstract class RMHATestBase extends ClientBaseWithFixes{
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void startRMs() throws IOException {
|
protected void startRMs() throws IOException {
|
||||||
rm1 = new MockRM(confForRM1, null, false);
|
rm1 = new MockRM(confForRM1, null, false){
|
||||||
rm2 = new MockRM(confForRM2, null, false);
|
@Override
|
||||||
|
protected Dispatcher createDispatcher() {
|
||||||
|
return new DrainDispatcher();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
rm2 = new MockRM(confForRM2, null, false){
|
||||||
|
@Override
|
||||||
|
protected Dispatcher createDispatcher() {
|
||||||
|
return new DrainDispatcher();
|
||||||
|
}
|
||||||
|
};
|
||||||
startRMs(rm1, confForRM1, rm2, confForRM2);
|
startRMs(rm1, confForRM1, rm2, confForRM2);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,7 +32,6 @@ import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -103,8 +102,6 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
|
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
|
||||||
import org.apache.hadoop.yarn.api.records.ReservationId;
|
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ReservationRequest;
|
import org.apache.hadoop.yarn.api.records.ReservationRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ReservationRequests;
|
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
|
@ -1115,7 +1112,8 @@ public class TestClientRMService {
|
||||||
long duration = 60000;
|
long duration = 60000;
|
||||||
long deadline = (long) (arrival + 1.05 * duration);
|
long deadline = (long) (arrival + 1.05 * duration);
|
||||||
ReservationSubmissionRequest sRequest =
|
ReservationSubmissionRequest sRequest =
|
||||||
createSimpleReservationRequest(4, arrival, deadline, duration);
|
ReservationSystemTestUtil.createSimpleReservationRequest(4, arrival,
|
||||||
|
deadline, duration);
|
||||||
ReservationSubmissionResponse sResponse = null;
|
ReservationSubmissionResponse sResponse = null;
|
||||||
try {
|
try {
|
||||||
sResponse = clientService.submitReservation(sRequest);
|
sResponse = clientService.submitReservation(sRequest);
|
||||||
|
@ -1167,24 +1165,6 @@ public class TestClientRMService {
|
||||||
rm = null;
|
rm = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private ReservationSubmissionRequest createSimpleReservationRequest(
|
|
||||||
int numContainers, long arrival, long deadline, long duration) {
|
|
||||||
// create a request with a single atomic ask
|
|
||||||
ReservationRequest r =
|
|
||||||
ReservationRequest.newInstance(Resource.newInstance(1024, 1),
|
|
||||||
numContainers, 1, duration);
|
|
||||||
ReservationRequests reqs =
|
|
||||||
ReservationRequests.newInstance(Collections.singletonList(r),
|
|
||||||
ReservationRequestInterpreter.R_ALL);
|
|
||||||
ReservationDefinition rDef =
|
|
||||||
ReservationDefinition.newInstance(arrival, deadline, reqs,
|
|
||||||
"testClientRMService#reservation");
|
|
||||||
ReservationSubmissionRequest request =
|
|
||||||
ReservationSubmissionRequest.newInstance(rDef,
|
|
||||||
ReservationSystemTestUtil.reservationQ);
|
|
||||||
return request;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetNodeLabels() throws Exception {
|
public void testGetNodeLabels() throws Exception {
|
||||||
MockRM rm = new MockRM() {
|
MockRM rm = new MockRM() {
|
||||||
|
|
|
@ -0,0 +1,212 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.util.Clock;
|
||||||
|
import org.apache.hadoop.yarn.util.UTCClock;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class TestReservationSystemWithRMHA extends RMHATestBase{
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setup() throws Exception {
|
||||||
|
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||||
|
ReservationSystemTestUtil.setupQueueConfiguration(conf);
|
||||||
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||||
|
ResourceScheduler.class);
|
||||||
|
conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
|
||||||
|
configuration = conf;
|
||||||
|
|
||||||
|
super.setup();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSubmitReservationAndCheckAfterFailover() throws Exception {
|
||||||
|
startRMs();
|
||||||
|
|
||||||
|
addNodeCapacityToPlan();
|
||||||
|
|
||||||
|
ClientRMService clientService = rm1.getClientRMService();
|
||||||
|
|
||||||
|
// create a reservation
|
||||||
|
ReservationSubmissionRequest request = createReservationSubmissionRequest();
|
||||||
|
ReservationSubmissionResponse response = null;
|
||||||
|
try {
|
||||||
|
response = clientService.submitReservation(request);
|
||||||
|
} catch (Exception e) {
|
||||||
|
Assert.fail(e.getMessage());
|
||||||
|
}
|
||||||
|
Assert.assertNotNull(response);
|
||||||
|
ReservationId reservationID = response.getReservationId();
|
||||||
|
Assert.assertNotNull(reservationID);
|
||||||
|
LOG.info("Submit reservation response: " + reservationID);
|
||||||
|
ReservationDefinition reservationDefinition = request
|
||||||
|
.getReservationDefinition();
|
||||||
|
|
||||||
|
// Do the failover
|
||||||
|
explicitFailover();
|
||||||
|
|
||||||
|
rm2.registerNode("127.0.0.1:1", 102400, 100);
|
||||||
|
|
||||||
|
RMState state = rm2.getRMContext().getStateStore().loadState();
|
||||||
|
Map<ReservationId, ReservationAllocationStateProto> reservationStateMap =
|
||||||
|
state.getReservationState().get(ReservationSystemTestUtil.reservationQ);
|
||||||
|
Assert.assertNotNull(reservationStateMap);
|
||||||
|
Assert.assertNotNull(reservationStateMap.get(reservationID));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateReservationAndCheckAfterFailover() throws Exception {
|
||||||
|
startRMs();
|
||||||
|
|
||||||
|
addNodeCapacityToPlan();
|
||||||
|
|
||||||
|
ClientRMService clientService = rm1.getClientRMService();
|
||||||
|
|
||||||
|
// create a reservation
|
||||||
|
ReservationSubmissionRequest request = createReservationSubmissionRequest();
|
||||||
|
ReservationSubmissionResponse response = null;
|
||||||
|
try {
|
||||||
|
response = clientService.submitReservation(request);
|
||||||
|
} catch (Exception e) {
|
||||||
|
Assert.fail(e.getMessage());
|
||||||
|
}
|
||||||
|
Assert.assertNotNull(response);
|
||||||
|
ReservationId reservationID = response.getReservationId();
|
||||||
|
Assert.assertNotNull(reservationID);
|
||||||
|
LOG.info("Submit reservation response: " + reservationID);
|
||||||
|
ReservationDefinition reservationDefinition = request
|
||||||
|
.getReservationDefinition();
|
||||||
|
|
||||||
|
|
||||||
|
// Change any field
|
||||||
|
|
||||||
|
long newDeadline = reservationDefinition.getDeadline() + 100;
|
||||||
|
reservationDefinition.setDeadline(newDeadline);
|
||||||
|
ReservationUpdateRequest updateRequest =
|
||||||
|
ReservationUpdateRequest.newInstance(
|
||||||
|
reservationDefinition, reservationID);
|
||||||
|
rm1.updateReservationState(updateRequest);
|
||||||
|
|
||||||
|
// Do the failover
|
||||||
|
explicitFailover();
|
||||||
|
|
||||||
|
rm2.registerNode("127.0.0.1:1", 102400, 100);
|
||||||
|
|
||||||
|
RMState state = rm2.getRMContext().getStateStore().loadState();
|
||||||
|
Map<ReservationId, ReservationAllocationStateProto> reservationStateMap =
|
||||||
|
state.getReservationState().get(ReservationSystemTestUtil.reservationQ);
|
||||||
|
Assert.assertNotNull(reservationStateMap);
|
||||||
|
ReservationAllocationStateProto reservationState =
|
||||||
|
reservationStateMap.get(reservationID);
|
||||||
|
Assert.assertEquals(newDeadline,
|
||||||
|
reservationState.getReservationDefinition().getDeadline());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeleteReservationAndCheckAfterFailover() throws Exception {
|
||||||
|
startRMs();
|
||||||
|
|
||||||
|
addNodeCapacityToPlan();
|
||||||
|
|
||||||
|
ClientRMService clientService = rm1.getClientRMService();
|
||||||
|
|
||||||
|
// create a reservation
|
||||||
|
ReservationSubmissionRequest request = createReservationSubmissionRequest();
|
||||||
|
ReservationSubmissionResponse response = null;
|
||||||
|
try {
|
||||||
|
response = clientService.submitReservation(request);
|
||||||
|
} catch (Exception e) {
|
||||||
|
Assert.fail(e.getMessage());
|
||||||
|
}
|
||||||
|
Assert.assertNotNull(response);
|
||||||
|
ReservationId reservationID = response.getReservationId();
|
||||||
|
Assert.assertNotNull(reservationID);
|
||||||
|
|
||||||
|
|
||||||
|
// Delete the reservation
|
||||||
|
ReservationDeleteRequest deleteRequest =
|
||||||
|
ReservationDeleteRequest.newInstance(reservationID);
|
||||||
|
clientService.deleteReservation(deleteRequest);
|
||||||
|
|
||||||
|
// Do the failover
|
||||||
|
explicitFailover();
|
||||||
|
|
||||||
|
rm2.registerNode("127.0.0.1:1", 102400, 100);
|
||||||
|
|
||||||
|
RMState state = rm2.getRMContext().getStateStore().loadState();
|
||||||
|
Assert.assertNull(state.getReservationState().get(
|
||||||
|
ReservationSystemTestUtil.reservationQ));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addNodeCapacityToPlan() {
|
||||||
|
try {
|
||||||
|
rm1.registerNode("127.0.0.1:1", 102400, 100);
|
||||||
|
int attempts = 10;
|
||||||
|
do {
|
||||||
|
DrainDispatcher dispatcher =
|
||||||
|
(DrainDispatcher) rm1.getRMContext().getDispatcher();
|
||||||
|
dispatcher.await();
|
||||||
|
rm1.getRMContext().getReservationSystem().synchronizePlan(
|
||||||
|
ReservationSystemTestUtil.reservationQ);
|
||||||
|
if (rm1.getRMContext().getReservationSystem().getPlan
|
||||||
|
(ReservationSystemTestUtil.reservationQ).getTotalCapacity()
|
||||||
|
.getMemory() > 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
LOG.info("Waiting for node capacity to be added to plan");
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
while (attempts-- > 0);
|
||||||
|
if (attempts <= 0) {
|
||||||
|
Assert.fail("Exhausted attempts in checking if node capacity was " +
|
||||||
|
"added to the plan");
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
Assert.fail(e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private ReservationSubmissionRequest createReservationSubmissionRequest() {
|
||||||
|
Clock clock = new UTCClock();
|
||||||
|
long arrival = clock.getTime();
|
||||||
|
long duration = 60000;
|
||||||
|
long deadline = (long) (arrival + 1.05 * duration);
|
||||||
|
return ReservationSystemTestUtil.createSimpleReservationRequest(4, arrival,
|
||||||
|
deadline, duration);
|
||||||
|
}
|
||||||
|
}
|
|
@ -32,6 +32,7 @@ import java.util.Random;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
|
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
|
||||||
import org.apache.hadoop.yarn.api.records.ReservationId;
|
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ReservationRequest;
|
import org.apache.hadoop.yarn.api.records.ReservationRequest;
|
||||||
|
@ -45,6 +46,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.AlignedPlannerWithGreedy;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.AlignedPlannerWithGreedy;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||||
|
@ -189,6 +191,30 @@ public class ReservationSystemTestUtil {
|
||||||
return rDef;
|
return rDef;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static ReservationSubmissionRequest createSimpleReservationRequest(
|
||||||
|
int numContainers, long arrival, long deadline, long duration) {
|
||||||
|
// create a request with a single atomic ask
|
||||||
|
ReservationRequest r =
|
||||||
|
ReservationRequest.newInstance(Resource.newInstance(1024, 1),
|
||||||
|
numContainers, 1, duration);
|
||||||
|
ReservationRequests reqs =
|
||||||
|
ReservationRequests.newInstance(Collections.singletonList(r),
|
||||||
|
ReservationRequestInterpreter.R_ALL);
|
||||||
|
ReservationDefinition rDef =
|
||||||
|
ReservationDefinition.newInstance(arrival, deadline, reqs,
|
||||||
|
"testClientRMService#reservation");
|
||||||
|
ReservationSubmissionRequest request =
|
||||||
|
ReservationSubmissionRequest.newInstance(rDef,
|
||||||
|
reservationQ);
|
||||||
|
return request;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static RMContext createMockRMContext() {
|
||||||
|
RMContext context = mock(RMContext.class);
|
||||||
|
when(context.getStateStore()).thenReturn(new MemoryRMStateStore());
|
||||||
|
return context;
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public CapacityScheduler mockCapacityScheduler(int numContainers)
|
public CapacityScheduler mockCapacityScheduler(int numContainers)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
|
@ -24,8 +24,10 @@ import java.io.IOException;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
|
||||||
import org.apache.hadoop.yarn.api.records.ReservationRequest;
|
import org.apache.hadoop.yarn.api.records.ReservationRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
|
||||||
|
@ -82,11 +84,12 @@ public class TestCapacityOverTimePolicy {
|
||||||
instConstraint, avgConstraint);
|
instConstraint, avgConstraint);
|
||||||
CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
|
CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
|
||||||
policy.init(reservationQ, conf);
|
policy.init(reservationQ, conf);
|
||||||
|
RMContext context = ReservationSystemTestUtil.createMockRMContext();
|
||||||
|
|
||||||
plan =
|
plan =
|
||||||
new InMemoryPlan(rootQueueMetrics, policy, mAgent,
|
new InMemoryPlan(rootQueueMetrics, policy, mAgent,
|
||||||
clusterResource, step, res, minAlloc, maxAlloc,
|
clusterResource, step, res, minAlloc, maxAlloc,
|
||||||
"dedicated", null, true);
|
"dedicated", null, true, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
public int[] generateData(int length, int val) {
|
public int[] generateData(int length, int val) {
|
||||||
|
@ -101,9 +104,13 @@ public class TestCapacityOverTimePolicy {
|
||||||
public void testSimplePass() throws IOException, PlanningException {
|
public void testSimplePass() throws IOException, PlanningException {
|
||||||
// generate allocation that simply fit within all constraints
|
// generate allocation that simply fit within all constraints
|
||||||
int[] f = generateData(3600, (int) Math.ceil(0.2 * totCont));
|
int[] f = generateData(3600, (int) Math.ceil(0.2 * totCont));
|
||||||
|
|
||||||
|
ReservationDefinition rDef =
|
||||||
|
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||||
|
initTime, initTime + f.length + 1, f.length);
|
||||||
assertTrue(plan.toString(),
|
assertTrue(plan.toString(),
|
||||||
plan.addReservation(new InMemoryReservationAllocation(
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
|
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
|
||||||
"dedicated", initTime, initTime + f.length,
|
"dedicated", initTime, initTime + f.length,
|
||||||
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||||
res, minAlloc)));
|
res, minAlloc)));
|
||||||
|
@ -115,9 +122,12 @@ public class TestCapacityOverTimePolicy {
|
||||||
// fit within
|
// fit within
|
||||||
// max instantanesou
|
// max instantanesou
|
||||||
int[] f = generateData(3600, (int) Math.ceil(0.69 * totCont));
|
int[] f = generateData(3600, (int) Math.ceil(0.69 * totCont));
|
||||||
|
ReservationDefinition rDef =
|
||||||
|
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||||
|
initTime, initTime + f.length + 1, f.length);
|
||||||
assertTrue(plan.toString(),
|
assertTrue(plan.toString(),
|
||||||
plan.addReservation(new InMemoryReservationAllocation(
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
|
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
|
||||||
"dedicated", initTime, initTime + f.length,
|
"dedicated", initTime, initTime + f.length,
|
||||||
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||||
res, minAlloc)));
|
res, minAlloc)));
|
||||||
|
@ -127,10 +137,13 @@ public class TestCapacityOverTimePolicy {
|
||||||
public void testMultiTenantPass() throws IOException, PlanningException {
|
public void testMultiTenantPass() throws IOException, PlanningException {
|
||||||
// generate allocation from multiple tenants that barely fit in tot capacity
|
// generate allocation from multiple tenants that barely fit in tot capacity
|
||||||
int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
|
int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
|
||||||
|
ReservationDefinition rDef =
|
||||||
|
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||||
|
initTime, initTime + f.length + 1, f.length);
|
||||||
for (int i = 0; i < 4; i++) {
|
for (int i = 0; i < 4; i++) {
|
||||||
assertTrue(plan.toString(),
|
assertTrue(plan.toString(),
|
||||||
plan.addReservation(new InMemoryReservationAllocation(
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
ReservationSystemTestUtil.getNewReservationId(), null, "u" + i,
|
ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
|
||||||
"dedicated", initTime, initTime + f.length,
|
"dedicated", initTime, initTime + f.length,
|
||||||
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||||
res, minAlloc)));
|
res, minAlloc)));
|
||||||
|
@ -141,10 +154,13 @@ public class TestCapacityOverTimePolicy {
|
||||||
public void testMultiTenantFail() throws IOException, PlanningException {
|
public void testMultiTenantFail() throws IOException, PlanningException {
|
||||||
// generate allocation from multiple tenants that exceed tot capacity
|
// generate allocation from multiple tenants that exceed tot capacity
|
||||||
int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
|
int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
|
||||||
|
ReservationDefinition rDef =
|
||||||
|
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||||
|
initTime, initTime + f.length + 1, f.length);
|
||||||
for (int i = 0; i < 5; i++) {
|
for (int i = 0; i < 5; i++) {
|
||||||
assertTrue(plan.toString(),
|
assertTrue(plan.toString(),
|
||||||
plan.addReservation(new InMemoryReservationAllocation(
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
ReservationSystemTestUtil.getNewReservationId(), null, "u" + i,
|
ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
|
||||||
"dedicated", initTime, initTime + f.length,
|
"dedicated", initTime, initTime + f.length,
|
||||||
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||||
res, minAlloc)));
|
res, minAlloc)));
|
||||||
|
@ -155,9 +171,12 @@ public class TestCapacityOverTimePolicy {
|
||||||
public void testInstFail() throws IOException, PlanningException {
|
public void testInstFail() throws IOException, PlanningException {
|
||||||
// generate allocation that exceed the instantaneous cap single-show
|
// generate allocation that exceed the instantaneous cap single-show
|
||||||
int[] f = generateData(3600, (int) Math.ceil(0.71 * totCont));
|
int[] f = generateData(3600, (int) Math.ceil(0.71 * totCont));
|
||||||
|
ReservationDefinition rDef =
|
||||||
|
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||||
|
initTime, initTime + f.length + 1, f.length);
|
||||||
assertTrue(plan.toString(),
|
assertTrue(plan.toString(),
|
||||||
plan.addReservation(new InMemoryReservationAllocation(
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
|
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
|
||||||
"dedicated", initTime, initTime + f.length,
|
"dedicated", initTime, initTime + f.length,
|
||||||
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||||
res, minAlloc)));
|
res, minAlloc)));
|
||||||
|
@ -168,23 +187,25 @@ public class TestCapacityOverTimePolicy {
|
||||||
public void testInstFailBySum() throws IOException, PlanningException {
|
public void testInstFailBySum() throws IOException, PlanningException {
|
||||||
// generate allocation that exceed the instantaneous cap by sum
|
// generate allocation that exceed the instantaneous cap by sum
|
||||||
int[] f = generateData(3600, (int) Math.ceil(0.3 * totCont));
|
int[] f = generateData(3600, (int) Math.ceil(0.3 * totCont));
|
||||||
|
ReservationDefinition rDef =
|
||||||
|
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||||
|
initTime, initTime + f.length + 1, f.length);
|
||||||
assertTrue(plan.toString(),
|
assertTrue(plan.toString(),
|
||||||
plan.addReservation(new InMemoryReservationAllocation(
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
|
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
|
||||||
"dedicated", initTime, initTime + f.length,
|
"dedicated", initTime, initTime + f.length,
|
||||||
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||||
res, minAlloc)));
|
res, minAlloc)));
|
||||||
assertTrue(plan.toString(),
|
assertTrue(plan.toString(),
|
||||||
plan.addReservation(new InMemoryReservationAllocation(
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
|
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
|
||||||
"dedicated", initTime, initTime + f.length,
|
"dedicated", initTime, initTime + f.length,
|
||||||
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||||
res, minAlloc)));
|
res, minAlloc)));
|
||||||
try {
|
try {
|
||||||
assertTrue(plan.toString(),
|
assertTrue(plan.toString(),
|
||||||
plan.addReservation(new InMemoryReservationAllocation(
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
|
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
|
||||||
"dedicated", initTime, initTime + f.length,
|
"dedicated", initTime, initTime + f.length,
|
||||||
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||||
res, minAlloc)));
|
res, minAlloc)));
|
||||||
|
@ -205,10 +226,12 @@ public class TestCapacityOverTimePolicy {
|
||||||
ReservationSystemUtil.toResource(
|
ReservationSystemUtil.toResource(
|
||||||
ReservationRequest.newInstance(Resource.newInstance(1024, 1),
|
ReservationRequest.newInstance(Resource.newInstance(1024, 1),
|
||||||
cont)));
|
cont)));
|
||||||
|
ReservationDefinition rDef =
|
||||||
|
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||||
|
initTime, initTime + win, win);
|
||||||
assertTrue(plan.toString(),
|
assertTrue(plan.toString(),
|
||||||
plan.addReservation(new InMemoryReservationAllocation(
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
|
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
|
||||||
"dedicated", initTime, initTime + win, req, res, minAlloc)));
|
"dedicated", initTime, initTime + win, req, res, minAlloc)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -222,9 +245,12 @@ public class TestCapacityOverTimePolicy {
|
||||||
req.put(new ReservationInterval(initTime, initTime + win),
|
req.put(new ReservationInterval(initTime, initTime + win),
|
||||||
ReservationSystemUtil.toResource(ReservationRequest.newInstance(Resource
|
ReservationSystemUtil.toResource(ReservationRequest.newInstance(Resource
|
||||||
.newInstance(1024, 1), cont)));
|
.newInstance(1024, 1), cont)));
|
||||||
|
ReservationDefinition rDef =
|
||||||
|
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||||
|
initTime, initTime + win, win);
|
||||||
assertTrue(plan.toString(),
|
assertTrue(plan.toString(),
|
||||||
plan.addReservation(new InMemoryReservationAllocation(
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
|
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
|
||||||
"dedicated", initTime, initTime + win, req, res, minAlloc)));
|
"dedicated", initTime, initTime + win, req, res, minAlloc)));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ReservationRequests;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
|
||||||
|
@ -59,6 +60,7 @@ public class TestInMemoryPlan {
|
||||||
private SharingPolicy policy;
|
private SharingPolicy policy;
|
||||||
private ReservationAgent agent;
|
private ReservationAgent agent;
|
||||||
private Planner replanner;
|
private Planner replanner;
|
||||||
|
private RMContext context;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws PlanningException {
|
public void setUp() throws PlanningException {
|
||||||
|
@ -73,6 +75,8 @@ public class TestInMemoryPlan {
|
||||||
replanner = mock(Planner.class);
|
replanner = mock(Planner.class);
|
||||||
|
|
||||||
when(clock.getTime()).thenReturn(1L);
|
when(clock.getTime()).thenReturn(1L);
|
||||||
|
|
||||||
|
context = ReservationSystemTestUtil.createMockRMContext();
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
@ -92,7 +96,7 @@ public class TestInMemoryPlan {
|
||||||
public void testAddReservation() {
|
public void testAddReservation() {
|
||||||
Plan plan =
|
Plan plan =
|
||||||
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
|
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
|
||||||
resCalc, minAlloc, maxAlloc, planName, replanner, true);
|
resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
|
||||||
ReservationId reservationID =
|
ReservationId reservationID =
|
||||||
ReservationSystemTestUtil.getNewReservationId();
|
ReservationSystemTestUtil.getNewReservationId();
|
||||||
int[] alloc = { 10, 10, 10, 10, 10, 10 };
|
int[] alloc = { 10, 10, 10, 10, 10, 10 };
|
||||||
|
@ -126,7 +130,7 @@ public class TestInMemoryPlan {
|
||||||
public void testAddEmptyReservation() {
|
public void testAddEmptyReservation() {
|
||||||
Plan plan =
|
Plan plan =
|
||||||
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
|
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
|
||||||
resCalc, minAlloc, maxAlloc, planName, replanner, true);
|
resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
|
||||||
ReservationId reservationID =
|
ReservationId reservationID =
|
||||||
ReservationSystemTestUtil.getNewReservationId();
|
ReservationSystemTestUtil.getNewReservationId();
|
||||||
int[] alloc = {};
|
int[] alloc = {};
|
||||||
|
@ -154,7 +158,7 @@ public class TestInMemoryPlan {
|
||||||
// First add a reservation
|
// First add a reservation
|
||||||
Plan plan =
|
Plan plan =
|
||||||
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
|
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
|
||||||
resCalc, minAlloc, maxAlloc, planName, replanner, true);
|
resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
|
||||||
ReservationId reservationID =
|
ReservationId reservationID =
|
||||||
ReservationSystemTestUtil.getNewReservationId();
|
ReservationSystemTestUtil.getNewReservationId();
|
||||||
int[] alloc = { 10, 10, 10, 10, 10, 10 };
|
int[] alloc = { 10, 10, 10, 10, 10, 10 };
|
||||||
|
@ -199,7 +203,7 @@ public class TestInMemoryPlan {
|
||||||
public void testUpdateReservation() {
|
public void testUpdateReservation() {
|
||||||
Plan plan =
|
Plan plan =
|
||||||
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
|
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
|
||||||
resCalc, minAlloc, maxAlloc, planName, replanner, true);
|
resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
|
||||||
ReservationId reservationID =
|
ReservationId reservationID =
|
||||||
ReservationSystemTestUtil.getNewReservationId();
|
ReservationSystemTestUtil.getNewReservationId();
|
||||||
// First add a reservation
|
// First add a reservation
|
||||||
|
@ -262,7 +266,7 @@ public class TestInMemoryPlan {
|
||||||
public void testUpdateNonExistingReservation() {
|
public void testUpdateNonExistingReservation() {
|
||||||
Plan plan =
|
Plan plan =
|
||||||
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
|
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
|
||||||
resCalc, minAlloc, maxAlloc, planName, replanner, true);
|
resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
|
||||||
ReservationId reservationID =
|
ReservationId reservationID =
|
||||||
ReservationSystemTestUtil.getNewReservationId();
|
ReservationSystemTestUtil.getNewReservationId();
|
||||||
// Try to update a reservation without adding
|
// Try to update a reservation without adding
|
||||||
|
@ -295,7 +299,7 @@ public class TestInMemoryPlan {
|
||||||
// First add a reservation
|
// First add a reservation
|
||||||
Plan plan =
|
Plan plan =
|
||||||
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
|
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
|
||||||
resCalc, minAlloc, maxAlloc, planName, replanner, true);
|
resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
|
||||||
ReservationId reservationID =
|
ReservationId reservationID =
|
||||||
ReservationSystemTestUtil.getNewReservationId();
|
ReservationSystemTestUtil.getNewReservationId();
|
||||||
int[] alloc = { 10, 10, 10, 10, 10, 10 };
|
int[] alloc = { 10, 10, 10, 10, 10, 10 };
|
||||||
|
@ -345,7 +349,7 @@ public class TestInMemoryPlan {
|
||||||
public void testDeleteNonExistingReservation() {
|
public void testDeleteNonExistingReservation() {
|
||||||
Plan plan =
|
Plan plan =
|
||||||
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
|
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
|
||||||
resCalc, minAlloc, maxAlloc, planName, replanner, true);
|
resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
|
||||||
ReservationId reservationID =
|
ReservationId reservationID =
|
||||||
ReservationSystemTestUtil.getNewReservationId();
|
ReservationSystemTestUtil.getNewReservationId();
|
||||||
// Try to delete a reservation without adding
|
// Try to delete a reservation without adding
|
||||||
|
@ -365,7 +369,7 @@ public class TestInMemoryPlan {
|
||||||
public void testArchiveCompletedReservations() {
|
public void testArchiveCompletedReservations() {
|
||||||
Plan plan =
|
Plan plan =
|
||||||
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
|
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
|
||||||
resCalc, minAlloc, maxAlloc, planName, replanner, true);
|
resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
|
||||||
ReservationId reservationID1 =
|
ReservationId reservationID1 =
|
||||||
ReservationSystemTestUtil.getNewReservationId();
|
ReservationSystemTestUtil.getNewReservationId();
|
||||||
// First add a reservation
|
// First add a reservation
|
||||||
|
|
|
@ -22,8 +22,10 @@ import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
|
||||||
import org.apache.hadoop.yarn.api.records.ReservationId;
|
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
|
||||||
|
@ -67,11 +69,12 @@ public class TestNoOverCommitPolicy {
|
||||||
(ReservationSchedulerConfiguration.class);
|
(ReservationSchedulerConfiguration.class);
|
||||||
NoOverCommitPolicy policy = new NoOverCommitPolicy();
|
NoOverCommitPolicy policy = new NoOverCommitPolicy();
|
||||||
policy.init(reservationQ, conf);
|
policy.init(reservationQ, conf);
|
||||||
|
RMContext context = ReservationSystemTestUtil.createMockRMContext();
|
||||||
|
|
||||||
plan =
|
plan =
|
||||||
new InMemoryPlan(rootQueueMetrics, policy, mAgent,
|
new InMemoryPlan(rootQueueMetrics, policy, mAgent,
|
||||||
clusterResource, step, res, minAlloc, maxAlloc,
|
clusterResource, step, res, minAlloc, maxAlloc,
|
||||||
"dedicated", null, true);
|
"dedicated", null, true, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
public int[] generateData(int length, int val) {
|
public int[] generateData(int length, int val) {
|
||||||
|
@ -86,9 +89,12 @@ public class TestNoOverCommitPolicy {
|
||||||
public void testSingleUserEasyFitPass() throws IOException, PlanningException {
|
public void testSingleUserEasyFitPass() throws IOException, PlanningException {
|
||||||
// generate allocation that easily fit within resource constraints
|
// generate allocation that easily fit within resource constraints
|
||||||
int[] f = generateData(3600, (int) Math.ceil(0.2 * totCont));
|
int[] f = generateData(3600, (int) Math.ceil(0.2 * totCont));
|
||||||
|
ReservationDefinition rDef =
|
||||||
|
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||||
|
initTime, initTime + f.length + 1, f.length);
|
||||||
assertTrue(plan.toString(),
|
assertTrue(plan.toString(),
|
||||||
plan.addReservation(new InMemoryReservationAllocation(
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
|
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
|
||||||
"dedicated", initTime, initTime + f.length,
|
"dedicated", initTime, initTime + f.length,
|
||||||
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||||
res, minAlloc)));
|
res, minAlloc)));
|
||||||
|
@ -99,9 +105,12 @@ public class TestNoOverCommitPolicy {
|
||||||
PlanningException {
|
PlanningException {
|
||||||
// generate allocation from single tenant that barely fit
|
// generate allocation from single tenant that barely fit
|
||||||
int[] f = generateData(3600, totCont);
|
int[] f = generateData(3600, totCont);
|
||||||
|
ReservationDefinition rDef =
|
||||||
|
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||||
|
initTime, initTime + f.length + 1, f.length);
|
||||||
assertTrue(plan.toString(),
|
assertTrue(plan.toString(),
|
||||||
plan.addReservation(new InMemoryReservationAllocation(
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
|
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
|
||||||
"dedicated", initTime, initTime + f.length,
|
"dedicated", initTime, initTime + f.length,
|
||||||
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||||
res, minAlloc)));
|
res, minAlloc)));
|
||||||
|
@ -121,14 +130,17 @@ public class TestNoOverCommitPolicy {
|
||||||
public void testUserMismatch() throws IOException, PlanningException {
|
public void testUserMismatch() throws IOException, PlanningException {
|
||||||
// generate allocation from single tenant that exceed capacity
|
// generate allocation from single tenant that exceed capacity
|
||||||
int[] f = generateData(3600, (int) (0.5 * totCont));
|
int[] f = generateData(3600, (int) (0.5 * totCont));
|
||||||
|
ReservationDefinition rDef =
|
||||||
|
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||||
|
initTime, initTime + f.length + 1, f.length);
|
||||||
ReservationId rid = ReservationSystemTestUtil.getNewReservationId();
|
ReservationId rid = ReservationSystemTestUtil.getNewReservationId();
|
||||||
plan.addReservation(new InMemoryReservationAllocation(rid, null, "u1",
|
|
||||||
|
plan.addReservation(new InMemoryReservationAllocation(rid, rDef, "u1",
|
||||||
"dedicated", initTime, initTime + f.length, ReservationSystemTestUtil
|
"dedicated", initTime, initTime + f.length, ReservationSystemTestUtil
|
||||||
.generateAllocation(initTime, step, f), res, minAlloc));
|
.generateAllocation(initTime, step, f), res, minAlloc));
|
||||||
|
|
||||||
// trying to update a reservation with a mismatching user
|
// trying to update a reservation with a mismatching user
|
||||||
plan.updateReservation(new InMemoryReservationAllocation(rid, null, "u2",
|
plan.updateReservation(new InMemoryReservationAllocation(rid, rDef, "u2",
|
||||||
"dedicated", initTime, initTime + f.length, ReservationSystemTestUtil
|
"dedicated", initTime, initTime + f.length, ReservationSystemTestUtil
|
||||||
.generateAllocation(initTime, step, f), res, minAlloc));
|
.generateAllocation(initTime, step, f), res, minAlloc));
|
||||||
}
|
}
|
||||||
|
@ -137,10 +149,13 @@ public class TestNoOverCommitPolicy {
|
||||||
public void testMultiTenantPass() throws IOException, PlanningException {
|
public void testMultiTenantPass() throws IOException, PlanningException {
|
||||||
// generate allocation from multiple tenants that barely fit in tot capacity
|
// generate allocation from multiple tenants that barely fit in tot capacity
|
||||||
int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
|
int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
|
||||||
|
ReservationDefinition rDef =
|
||||||
|
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||||
|
initTime, initTime + f.length + 1, f.length);
|
||||||
for (int i = 0; i < 4; i++) {
|
for (int i = 0; i < 4; i++) {
|
||||||
assertTrue(plan.toString(),
|
assertTrue(plan.toString(),
|
||||||
plan.addReservation(new InMemoryReservationAllocation(
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
ReservationSystemTestUtil.getNewReservationId(), null, "u" + i,
|
ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
|
||||||
"dedicated", initTime, initTime + f.length,
|
"dedicated", initTime, initTime + f.length,
|
||||||
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||||
res, minAlloc)));
|
res, minAlloc)));
|
||||||
|
@ -151,10 +166,13 @@ public class TestNoOverCommitPolicy {
|
||||||
public void testMultiTenantFail() throws IOException, PlanningException {
|
public void testMultiTenantFail() throws IOException, PlanningException {
|
||||||
// generate allocation from multiple tenants that exceed tot capacity
|
// generate allocation from multiple tenants that exceed tot capacity
|
||||||
int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
|
int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
|
||||||
|
ReservationDefinition rDef =
|
||||||
|
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||||
|
initTime, initTime + f.length + 1, f.length);
|
||||||
for (int i = 0; i < 5; i++) {
|
for (int i = 0; i < 5; i++) {
|
||||||
assertTrue(plan.toString(),
|
assertTrue(plan.toString(),
|
||||||
plan.addReservation(new InMemoryReservationAllocation(
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
ReservationSystemTestUtil.getNewReservationId(), null, "u" + i,
|
ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
|
||||||
"dedicated", initTime, initTime + f.length,
|
"dedicated", initTime, initTime + f.length,
|
||||||
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||||
res, minAlloc)));
|
res, minAlloc)));
|
||||||
|
|
|
@ -21,8 +21,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
|
||||||
import org.apache.hadoop.yarn.api.records.ReservationId;
|
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
|
@ -39,6 +41,7 @@ import org.junit.Assert;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
public abstract class TestSchedulerPlanFollowerBase {
|
public abstract class TestSchedulerPlanFollowerBase {
|
||||||
|
@ -51,6 +54,7 @@ public abstract class TestSchedulerPlanFollowerBase {
|
||||||
protected CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
|
protected CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
|
||||||
protected Plan plan;
|
protected Plan plan;
|
||||||
private ResourceCalculator res = new DefaultResourceCalculator();
|
private ResourceCalculator res = new DefaultResourceCalculator();
|
||||||
|
private RMContext context = ReservationSystemTestUtil.createMockRMContext();
|
||||||
|
|
||||||
protected void testPlanFollower(boolean isMove) throws PlanningException,
|
protected void testPlanFollower(boolean isMove) throws PlanningException,
|
||||||
InterruptedException, AccessControlException {
|
InterruptedException, AccessControlException {
|
||||||
|
@ -59,27 +63,30 @@ public abstract class TestSchedulerPlanFollowerBase {
|
||||||
new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent,
|
new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent,
|
||||||
scheduler.getClusterResource(), 1L, res,
|
scheduler.getClusterResource(), 1L, res,
|
||||||
scheduler.getMinimumResourceCapability(), maxAlloc, "dedicated",
|
scheduler.getMinimumResourceCapability(), maxAlloc, "dedicated",
|
||||||
null, isMove);
|
null, isMove, context);
|
||||||
|
|
||||||
// add a few reservations to the plan
|
// add a few reservations to the plan
|
||||||
long ts = System.currentTimeMillis();
|
long ts = System.currentTimeMillis();
|
||||||
ReservationId r1 = ReservationId.newInstance(ts, 1);
|
ReservationId r1 = ReservationId.newInstance(ts, 1);
|
||||||
int[] f1 = { 10, 10, 10, 10, 10 };
|
int[] f1 = { 10, 10, 10, 10, 10 };
|
||||||
|
ReservationDefinition rDef =
|
||||||
|
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||||
|
0, 0 + f1.length + 1, f1.length);
|
||||||
assertTrue(plan.toString(),
|
assertTrue(plan.toString(),
|
||||||
plan.addReservation(new InMemoryReservationAllocation(r1, null, "u3",
|
plan.addReservation(new InMemoryReservationAllocation(r1, rDef, "u3",
|
||||||
"dedicated", 0, 0 + f1.length, ReservationSystemTestUtil
|
"dedicated", 0, 0 + f1.length, ReservationSystemTestUtil
|
||||||
.generateAllocation(0L, 1L, f1), res, minAlloc)));
|
.generateAllocation(0L, 1L, f1), res, minAlloc)));
|
||||||
|
|
||||||
ReservationId r2 = ReservationId.newInstance(ts, 2);
|
ReservationId r2 = ReservationId.newInstance(ts, 2);
|
||||||
assertTrue(plan.toString(),
|
assertTrue(plan.toString(),
|
||||||
plan.addReservation(new InMemoryReservationAllocation(r2, null, "u3",
|
plan.addReservation(new InMemoryReservationAllocation(r2, rDef, "u3",
|
||||||
"dedicated", 3, 3 + f1.length, ReservationSystemTestUtil
|
"dedicated", 3, 3 + f1.length, ReservationSystemTestUtil
|
||||||
.generateAllocation(3L, 1L, f1), res, minAlloc)));
|
.generateAllocation(3L, 1L, f1), res, minAlloc)));
|
||||||
|
|
||||||
ReservationId r3 = ReservationId.newInstance(ts, 3);
|
ReservationId r3 = ReservationId.newInstance(ts, 3);
|
||||||
int[] f2 = { 0, 10, 20, 10, 0 };
|
int[] f2 = { 0, 10, 20, 10, 0 };
|
||||||
assertTrue(plan.toString(),
|
assertTrue(plan.toString(),
|
||||||
plan.addReservation(new InMemoryReservationAllocation(r3, null, "u4",
|
plan.addReservation(new InMemoryReservationAllocation(r3, rDef, "u4",
|
||||||
"dedicated", 10, 10 + f2.length, ReservationSystemTestUtil
|
"dedicated", 10, 10 + f2.length, ReservationSystemTestUtil
|
||||||
.generateAllocation(10L, 1L, f2), res, minAlloc)));
|
.generateAllocation(10L, 1L, f2), res, minAlloc)));
|
||||||
|
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ReservationRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
|
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
|
||||||
import org.apache.hadoop.yarn.api.records.ReservationRequests;
|
import org.apache.hadoop.yarn.api.records.ReservationRequests;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
|
||||||
|
@ -724,6 +725,7 @@ public class TestAlignedPlanner {
|
||||||
policy.init(reservationQ, conf);
|
policy.init(reservationQ, conf);
|
||||||
|
|
||||||
QueueMetrics queueMetrics = mock(QueueMetrics.class);
|
QueueMetrics queueMetrics = mock(QueueMetrics.class);
|
||||||
|
RMContext context = ReservationSystemTestUtil.createMockRMContext();
|
||||||
|
|
||||||
// Set planning agent
|
// Set planning agent
|
||||||
agent = new AlignedPlannerWithGreedy();
|
agent = new AlignedPlannerWithGreedy();
|
||||||
|
@ -731,7 +733,7 @@ public class TestAlignedPlanner {
|
||||||
// Create Plan
|
// Create Plan
|
||||||
plan =
|
plan =
|
||||||
new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
|
new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
|
||||||
res, minAlloc, maxAlloc, "dedicated", null, true);
|
res, minAlloc, maxAlloc, "dedicated", null, true, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
private int initializeScenario1() throws PlanningException {
|
private int initializeScenario1() throws PlanningException {
|
||||||
|
@ -782,9 +784,12 @@ public class TestAlignedPlanner {
|
||||||
private void addFixedAllocation(long start, long step, int[] f)
|
private void addFixedAllocation(long start, long step, int[] f)
|
||||||
throws PlanningException {
|
throws PlanningException {
|
||||||
|
|
||||||
|
ReservationDefinition rDef =
|
||||||
|
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||||
|
start, start + f.length * step, f.length * step);
|
||||||
assertTrue(plan.toString(),
|
assertTrue(plan.toString(),
|
||||||
plan.addReservation(new InMemoryReservationAllocation(
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
ReservationSystemTestUtil.getNewReservationId(), null,
|
ReservationSystemTestUtil.getNewReservationId(), rDef,
|
||||||
"user_fixed", "dedicated", start, start + f.length * step,
|
"user_fixed", "dedicated", start, start + f.length * step,
|
||||||
ReservationSystemTestUtil.generateAllocation(start, step, f), res,
|
ReservationSystemTestUtil.generateAllocation(start, step, f), res,
|
||||||
minAlloc)));
|
minAlloc)));
|
||||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ReservationRequests;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
|
||||||
|
@ -89,9 +90,10 @@ public class TestGreedyReservationAgent {
|
||||||
agent = new GreedyReservationAgent();
|
agent = new GreedyReservationAgent();
|
||||||
|
|
||||||
QueueMetrics queueMetrics = mock(QueueMetrics.class);
|
QueueMetrics queueMetrics = mock(QueueMetrics.class);
|
||||||
|
RMContext context = ReservationSystemTestUtil.createMockRMContext();
|
||||||
|
|
||||||
plan = new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
|
plan = new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
|
||||||
res, minAlloc, maxAlloc, "dedicated", null, true);
|
res, minAlloc, maxAlloc, "dedicated", null, true, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("javadoc")
|
@SuppressWarnings("javadoc")
|
||||||
|
@ -141,9 +143,12 @@ public class TestGreedyReservationAgent {
|
||||||
// create a completely utilized segment around time 30
|
// create a completely utilized segment around time 30
|
||||||
int[] f = { 100, 100 };
|
int[] f = { 100, 100 };
|
||||||
|
|
||||||
|
ReservationDefinition rDef =
|
||||||
|
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||||
|
30 * step, 30 * step + f.length * step, f.length * step);
|
||||||
assertTrue(plan.toString(),
|
assertTrue(plan.toString(),
|
||||||
plan.addReservation(new InMemoryReservationAllocation(
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
|
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
|
||||||
"dedicated", 30 * step, 30 * step + f.length * step,
|
"dedicated", 30 * step, 30 * step + f.length * step,
|
||||||
ReservationSystemTestUtil.generateAllocation(30 * step, step, f),
|
ReservationSystemTestUtil.generateAllocation(30 * step, step, f),
|
||||||
res, minAlloc)));
|
res, minAlloc)));
|
||||||
|
@ -195,10 +200,12 @@ public class TestGreedyReservationAgent {
|
||||||
prepareBasicPlan();
|
prepareBasicPlan();
|
||||||
// create a completely utilized segment at time 30
|
// create a completely utilized segment at time 30
|
||||||
int[] f = { 100, 100 };
|
int[] f = { 100, 100 };
|
||||||
|
ReservationDefinition rDef =
|
||||||
|
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||||
|
30, 30 * step + f.length * step, f.length * step);
|
||||||
assertTrue(plan.toString(),
|
assertTrue(plan.toString(),
|
||||||
plan.addReservation(new InMemoryReservationAllocation(
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
|
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
|
||||||
"dedicated", 30 * step, 30 * step + f.length * step,
|
"dedicated", 30 * step, 30 * step + f.length * step,
|
||||||
ReservationSystemTestUtil.generateAllocation(30 * step, step, f),
|
ReservationSystemTestUtil.generateAllocation(30 * step, step, f),
|
||||||
res, minAlloc)));
|
res, minAlloc)));
|
||||||
|
@ -515,10 +522,12 @@ public class TestGreedyReservationAgent {
|
||||||
// conditions for assignment that are non-empty
|
// conditions for assignment that are non-empty
|
||||||
|
|
||||||
int[] f = { 10, 10, 20, 20, 20, 10, 10 };
|
int[] f = { 10, 10, 20, 20, 20, 10, 10 };
|
||||||
|
ReservationDefinition rDef =
|
||||||
|
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||||
|
0, 0 + f.length * step, f.length * step);
|
||||||
assertTrue(plan.toString(),
|
assertTrue(plan.toString(),
|
||||||
plan.addReservation(new InMemoryReservationAllocation(
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
|
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
|
||||||
"dedicated", 0L, 0L + f.length * step, ReservationSystemTestUtil
|
"dedicated", 0L, 0L + f.length * step, ReservationSystemTestUtil
|
||||||
.generateAllocation(0, step, f), res, minAlloc)));
|
.generateAllocation(0, step, f), res, minAlloc)));
|
||||||
|
|
||||||
|
@ -527,7 +536,7 @@ public class TestGreedyReservationAgent {
|
||||||
ReservationSystemTestUtil.generateAllocation(5000, step, f2);
|
ReservationSystemTestUtil.generateAllocation(5000, step, f2);
|
||||||
assertTrue(plan.toString(),
|
assertTrue(plan.toString(),
|
||||||
plan.addReservation(new InMemoryReservationAllocation(
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
|
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
|
||||||
"dedicated", 5000, 5000 + f2.length * step, alloc, res, minAlloc)));
|
"dedicated", 5000, 5000 + f2.length * step, alloc, res, minAlloc)));
|
||||||
|
|
||||||
System.out.println("--------BEFORE AGENT----------");
|
System.out.println("--------BEFORE AGENT----------");
|
||||||
|
@ -562,9 +571,11 @@ public class TestGreedyReservationAgent {
|
||||||
instConstraint, avgConstraint);
|
instConstraint, avgConstraint);
|
||||||
CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
|
CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
|
||||||
policy.init(reservationQ, conf);
|
policy.init(reservationQ, conf);
|
||||||
|
RMContext context = ReservationSystemTestUtil.createMockRMContext();
|
||||||
|
|
||||||
plan = new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, agent,
|
plan = new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, agent,
|
||||||
clusterCapacity, step, res, minAlloc, maxAlloc, "dedicated", null, true);
|
clusterCapacity, step, res, minAlloc, maxAlloc, "dedicated", null,
|
||||||
|
true, context);
|
||||||
|
|
||||||
int acc = 0;
|
int acc = 0;
|
||||||
List<ReservationDefinition> list = new ArrayList<ReservationDefinition>();
|
List<ReservationDefinition> list = new ArrayList<ReservationDefinition>();
|
||||||
|
|
|
@ -27,15 +27,18 @@ import static org.mockito.Mockito.when;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
|
||||||
import org.apache.hadoop.yarn.api.records.ReservationId;
|
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ReservationRequest;
|
import org.apache.hadoop.yarn.api.records.ReservationRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.NoOverCommitPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.NoOverCommitPolicy;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemUtil;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemUtil;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.SharingPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.SharingPolicy;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||||
|
@ -67,6 +70,7 @@ public class TestSimpleCapacityReplanner {
|
||||||
when(clock.getTime()).thenReturn(0L);
|
when(clock.getTime()).thenReturn(0L);
|
||||||
SimpleCapacityReplanner enf = new SimpleCapacityReplanner(clock);
|
SimpleCapacityReplanner enf = new SimpleCapacityReplanner(clock);
|
||||||
|
|
||||||
|
RMContext context = ReservationSystemTestUtil.createMockRMContext();
|
||||||
ReservationSchedulerConfiguration conf =
|
ReservationSchedulerConfiguration conf =
|
||||||
mock(ReservationSchedulerConfiguration.class);
|
mock(ReservationSchedulerConfiguration.class);
|
||||||
when(conf.getEnforcementWindow(any(String.class))).thenReturn(6L);
|
when(conf.getEnforcementWindow(any(String.class))).thenReturn(6L);
|
||||||
|
@ -76,52 +80,55 @@ public class TestSimpleCapacityReplanner {
|
||||||
// Initialize the plan with more resources
|
// Initialize the plan with more resources
|
||||||
InMemoryPlan plan =
|
InMemoryPlan plan =
|
||||||
new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
|
new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
|
||||||
res, minAlloc, maxAlloc, "dedicated", enf, true, clock);
|
res, minAlloc, maxAlloc, "dedicated", enf, true, context, clock);
|
||||||
|
|
||||||
// add reservation filling the plan (separating them 1ms, so we are sure
|
// add reservation filling the plan (separating them 1ms, so we are sure
|
||||||
// s2 follows s1 on acceptance
|
// s2 follows s1 on acceptance
|
||||||
long ts = System.currentTimeMillis();
|
long ts = System.currentTimeMillis();
|
||||||
ReservationId r1 = ReservationId.newInstance(ts, 1);
|
ReservationId r1 = ReservationId.newInstance(ts, 1);
|
||||||
int[] f5 = { 20, 20, 20, 20, 20 };
|
int[] f5 = { 20, 20, 20, 20, 20 };
|
||||||
|
ReservationDefinition rDef =
|
||||||
|
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||||
|
0, 0 + f5.length, f5.length);
|
||||||
assertTrue(plan.toString(),
|
assertTrue(plan.toString(),
|
||||||
plan.addReservation(new InMemoryReservationAllocation(r1, null, "u3",
|
plan.addReservation(new InMemoryReservationAllocation(r1, rDef, "u3",
|
||||||
"dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
|
"dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
|
||||||
minAlloc)));
|
minAlloc)));
|
||||||
when(clock.getTime()).thenReturn(1L);
|
when(clock.getTime()).thenReturn(1L);
|
||||||
ReservationId r2 = ReservationId.newInstance(ts, 2);
|
ReservationId r2 = ReservationId.newInstance(ts, 2);
|
||||||
assertTrue(plan.toString(),
|
assertTrue(plan.toString(),
|
||||||
plan.addReservation(new InMemoryReservationAllocation(r2, null, "u4",
|
plan.addReservation(new InMemoryReservationAllocation(r2, rDef, "u4",
|
||||||
"dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
|
"dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
|
||||||
minAlloc)));
|
minAlloc)));
|
||||||
when(clock.getTime()).thenReturn(2L);
|
when(clock.getTime()).thenReturn(2L);
|
||||||
ReservationId r3 = ReservationId.newInstance(ts, 3);
|
ReservationId r3 = ReservationId.newInstance(ts, 3);
|
||||||
assertTrue(plan.toString(),
|
assertTrue(plan.toString(),
|
||||||
plan.addReservation(new InMemoryReservationAllocation(r3, null, "u5",
|
plan.addReservation(new InMemoryReservationAllocation(r3, rDef, "u5",
|
||||||
"dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
|
"dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
|
||||||
minAlloc)));
|
minAlloc)));
|
||||||
when(clock.getTime()).thenReturn(3L);
|
when(clock.getTime()).thenReturn(3L);
|
||||||
ReservationId r4 = ReservationId.newInstance(ts, 4);
|
ReservationId r4 = ReservationId.newInstance(ts, 4);
|
||||||
assertTrue(plan.toString(),
|
assertTrue(plan.toString(),
|
||||||
plan.addReservation(new InMemoryReservationAllocation(r4, null, "u6",
|
plan.addReservation(new InMemoryReservationAllocation(r4, rDef, "u6",
|
||||||
"dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
|
"dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
|
||||||
minAlloc)));
|
minAlloc)));
|
||||||
when(clock.getTime()).thenReturn(4L);
|
when(clock.getTime()).thenReturn(4L);
|
||||||
ReservationId r5 = ReservationId.newInstance(ts, 5);
|
ReservationId r5 = ReservationId.newInstance(ts, 5);
|
||||||
assertTrue(plan.toString(),
|
assertTrue(plan.toString(),
|
||||||
plan.addReservation(new InMemoryReservationAllocation(r5, null, "u7",
|
plan.addReservation(new InMemoryReservationAllocation(r5, rDef, "u7",
|
||||||
"dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
|
"dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res,
|
||||||
minAlloc)));
|
minAlloc)));
|
||||||
|
|
||||||
int[] f6 = { 50, 50, 50, 50, 50 };
|
int[] f6 = { 50, 50, 50, 50, 50 };
|
||||||
ReservationId r6 = ReservationId.newInstance(ts, 6);
|
ReservationId r6 = ReservationId.newInstance(ts, 6);
|
||||||
assertTrue(plan.toString(),
|
assertTrue(plan.toString(),
|
||||||
plan.addReservation(new InMemoryReservationAllocation(r6, null, "u3",
|
plan.addReservation(new InMemoryReservationAllocation(r6, rDef, "u3",
|
||||||
"dedicated", 10, 10 + f6.length, generateAllocation(10, f6), res,
|
"dedicated", 10, 10 + f6.length, generateAllocation(10, f6), res,
|
||||||
minAlloc)));
|
minAlloc)));
|
||||||
when(clock.getTime()).thenReturn(6L);
|
when(clock.getTime()).thenReturn(6L);
|
||||||
ReservationId r7 = ReservationId.newInstance(ts, 7);
|
ReservationId r7 = ReservationId.newInstance(ts, 7);
|
||||||
assertTrue(plan.toString(),
|
assertTrue(plan.toString(),
|
||||||
plan.addReservation(new InMemoryReservationAllocation(r7, null, "u4",
|
plan.addReservation(new InMemoryReservationAllocation(r7, rDef, "u4",
|
||||||
"dedicated", 10, 10 + f6.length, generateAllocation(10, f6), res,
|
"dedicated", 10, 10 + f6.length, generateAllocation(10, f6), res,
|
||||||
minAlloc)));
|
minAlloc)));
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue