YARN-5330. SharingPolicy enhancements required to support recurring reservations in ReservationSystem. (Carlo Curino via Subru).
This commit is contained in:
parent
56d93d2e39
commit
fa6137501c
|
@ -95,26 +95,29 @@ public class CapacityOverTimePolicy extends NoOverCommitPolicy {
|
|||
throw new PlanningQuotaException(p);
|
||||
}
|
||||
|
||||
long checkStart = reservation.getStartTime() - validWindow;
|
||||
long checkEnd = reservation.getEndTime() + validWindow;
|
||||
|
||||
//---- check for integral violations of capacity --------
|
||||
|
||||
// Gather a view of what to check (curr allocation of user, minus old
|
||||
// version of this reservation, plus new version)
|
||||
RLESparseResourceAllocation consumptionForUserOverTime =
|
||||
plan.getConsumptionForUserOverTime(reservation.getUser(),
|
||||
reservation.getStartTime() - validWindow,
|
||||
reservation.getEndTime() + validWindow);
|
||||
checkStart, checkEnd);
|
||||
|
||||
ReservationAllocation old =
|
||||
plan.getReservationById(reservation.getReservationId());
|
||||
if (old != null) {
|
||||
consumptionForUserOverTime = RLESparseResourceAllocation
|
||||
.merge(plan.getResourceCalculator(), plan.getTotalCapacity(),
|
||||
consumptionForUserOverTime, old.getResourcesOverTime(),
|
||||
RLEOperator.add, reservation.getStartTime() - validWindow,
|
||||
reservation.getEndTime() + validWindow);
|
||||
consumptionForUserOverTime =
|
||||
RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
|
||||
plan.getTotalCapacity(), consumptionForUserOverTime,
|
||||
old.getResourcesOverTime(checkStart, checkEnd), RLEOperator.add,
|
||||
checkStart, checkEnd);
|
||||
}
|
||||
|
||||
RLESparseResourceAllocation resRLE = reservation.getResourcesOverTime();
|
||||
RLESparseResourceAllocation resRLE =
|
||||
reservation.getResourcesOverTime(checkStart, checkEnd);
|
||||
|
||||
RLESparseResourceAllocation toCheck = RLESparseResourceAllocation
|
||||
.merge(plan.getResourceCalculator(), plan.getTotalCapacity(),
|
||||
|
@ -191,11 +194,11 @@ public class CapacityOverTimePolicy extends NoOverCommitPolicy {
|
|||
|
||||
// compare using merge() limit with integral
|
||||
try {
|
||||
RLESparseResourceAllocation
|
||||
.merge(plan.getResourceCalculator(), plan.getTotalCapacity(),
|
||||
targetLimit, integral, RLEOperator.subtractTestNonNegative,
|
||||
reservation.getStartTime() - validWindow,
|
||||
reservation.getEndTime() + validWindow);
|
||||
|
||||
RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
|
||||
plan.getTotalCapacity(), targetLimit, integral,
|
||||
RLEOperator.subtractTestNonNegative, checkStart, checkEnd);
|
||||
|
||||
} catch (PlanningException p) {
|
||||
throw new PlanningQuotaException(
|
||||
"Integral (avg over time) quota capacity " + maxAvg
|
||||
|
@ -240,7 +243,8 @@ public class CapacityOverTimePolicy extends NoOverCommitPolicy {
|
|||
if (old != null) {
|
||||
used = RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
|
||||
Resources.clone(plan.getTotalCapacity()), used,
|
||||
old.getResourcesOverTime(), RLEOperator.subtract, start, end);
|
||||
old.getResourcesOverTime(start, end), RLEOperator.subtract, start,
|
||||
end);
|
||||
}
|
||||
|
||||
instRLEQuota = RLESparseResourceAllocation
|
||||
|
|
|
@ -40,13 +40,17 @@ public class NoOverCommitPolicy implements SharingPolicy {
|
|||
|
||||
RLESparseResourceAllocation available = plan.getAvailableResourceOverTime(
|
||||
reservation.getUser(), reservation.getReservationId(),
|
||||
reservation.getStartTime(), reservation.getEndTime(), 0);
|
||||
reservation.getStartTime(), reservation.getEndTime(),
|
||||
reservation.getPeriodicity());
|
||||
|
||||
// test the reservation does not exceed what is available
|
||||
try {
|
||||
|
||||
RLESparseResourceAllocation ask = reservation.getResourcesOverTime(
|
||||
reservation.getStartTime(), reservation.getEndTime());
|
||||
RLESparseResourceAllocation
|
||||
.merge(plan.getResourceCalculator(), plan.getTotalCapacity(),
|
||||
available, reservation.getResourcesOverTime(),
|
||||
available, ask,
|
||||
RLESparseResourceAllocation.RLEOperator.subtractTestNonNegative,
|
||||
reservation.getStartTime(), reservation.getEndTime());
|
||||
} catch (PlanningException p) {
|
||||
|
|
|
@ -0,0 +1,189 @@
|
|||
/*******************************************************************************
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*******************************************************************************/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
|
||||
|
||||
import static junit.framework.TestCase.fail;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.junit.Before;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import net.jcip.annotations.NotThreadSafe;
|
||||
|
||||
/**
|
||||
* This class is a base test for {@code SharingPolicy} implementors.
|
||||
*/
|
||||
@RunWith(value = Parameterized.class)
|
||||
@NotThreadSafe
|
||||
@SuppressWarnings("VisibilityModifier")
|
||||
public abstract class BaseSharingPolicyTest {
|
||||
|
||||
@Parameterized.Parameter(value = 0)
|
||||
public long duration;
|
||||
|
||||
@Parameterized.Parameter(value = 1)
|
||||
public double height;
|
||||
|
||||
@Parameterized.Parameter(value = 2)
|
||||
public int numSubmissions;
|
||||
|
||||
@Parameterized.Parameter(value = 3)
|
||||
public String recurrenceExpression;
|
||||
|
||||
@Parameterized.Parameter(value = 4)
|
||||
public Class expectedError;
|
||||
|
||||
private long step;
|
||||
private long initTime;
|
||||
|
||||
private InMemoryPlan plan;
|
||||
private ReservationAgent mAgent;
|
||||
private Resource minAlloc;
|
||||
private ResourceCalculator res;
|
||||
private Resource maxAlloc;
|
||||
|
||||
private int totCont = 1000;
|
||||
|
||||
protected ReservationSchedulerConfiguration conf;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
// 1 sec step
|
||||
step = 1000L;
|
||||
initTime = System.currentTimeMillis();
|
||||
|
||||
minAlloc = Resource.newInstance(1024, 1);
|
||||
res = new DefaultResourceCalculator();
|
||||
maxAlloc = Resource.newInstance(1024 * 8, 8);
|
||||
|
||||
mAgent = mock(ReservationAgent.class);
|
||||
|
||||
QueueMetrics rootQueueMetrics = mock(QueueMetrics.class);
|
||||
Resource clusterResource =
|
||||
ReservationSystemTestUtil.calculateClusterResource(totCont);
|
||||
|
||||
// invoke implementors initialization of policy
|
||||
SharingPolicy policy = getInitializedPolicy();
|
||||
|
||||
RMContext context = ReservationSystemTestUtil.createMockRMContext();
|
||||
|
||||
plan = new InMemoryPlan(rootQueueMetrics, policy, mAgent, clusterResource,
|
||||
step, res, minAlloc, maxAlloc, "dedicated", null, true, context);
|
||||
}
|
||||
|
||||
public void runTest() throws IOException, PlanningException {
|
||||
|
||||
long period = 1;
|
||||
if (recurrenceExpression != null) {
|
||||
period = Long.parseLong(recurrenceExpression);
|
||||
}
|
||||
|
||||
try {
|
||||
RLESparseResourceAllocation rle = generateRLEAlloc(period);
|
||||
|
||||
// Generate the intervalMap (trimming out-of-period entries)
|
||||
Map<ReservationInterval, Resource> reservationIntervalResourceMap;
|
||||
if (period > 1) {
|
||||
rle = new PeriodicRLESparseResourceAllocation(rle, period);
|
||||
reservationIntervalResourceMap =
|
||||
ReservationSystemTestUtil.toAllocation(rle, 0, period);
|
||||
} else {
|
||||
reservationIntervalResourceMap = ReservationSystemTestUtil
|
||||
.toAllocation(rle, Long.MIN_VALUE, Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
ReservationDefinition rDef =
|
||||
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||
initTime % period, initTime % period + duration + 1, duration, 1,
|
||||
recurrenceExpression);
|
||||
|
||||
// perform multiple submissions where required
|
||||
for (int i = 0; i < numSubmissions; i++) {
|
||||
|
||||
long rstart = rle.getEarliestStartTime();
|
||||
long rend = rle.getLatestNonNullTime();
|
||||
|
||||
InMemoryReservationAllocation resAlloc =
|
||||
new InMemoryReservationAllocation(
|
||||
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
|
||||
"dedicated", rstart, rend, reservationIntervalResourceMap, res,
|
||||
minAlloc);
|
||||
|
||||
assertTrue(plan.toString(), plan.addReservation(resAlloc, false));
|
||||
}
|
||||
// fail if error was expected
|
||||
if (expectedError != null) {
|
||||
System.out.println(plan.toString());
|
||||
fail();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
if (expectedError == null || !e.getClass().getCanonicalName()
|
||||
.equals(expectedError.getCanonicalName())) {
|
||||
// fail on unexpected errors
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private RLESparseResourceAllocation generateRLEAlloc(long period) {
|
||||
RLESparseResourceAllocation rle =
|
||||
new RLESparseResourceAllocation(new DefaultResourceCalculator());
|
||||
|
||||
Resource alloc = Resources.multiply(minAlloc, height * totCont);
|
||||
|
||||
// loop in case the periodicity of the reservation is smaller than LCM
|
||||
long rStart = initTime % period;
|
||||
long rEnd = initTime % period + duration;
|
||||
|
||||
|
||||
// handle wrap-around
|
||||
if (period > 1 && rEnd > period) {
|
||||
long diff = rEnd - period;
|
||||
rEnd = period;
|
||||
|
||||
// handle multiple wrap-arounds (e.g., 5h duration on a 2h periodicity)
|
||||
if(duration > period) {
|
||||
rle.addInterval(new ReservationInterval(0, period),
|
||||
Resources.multiply(alloc, duration / period - 1));
|
||||
rle.addInterval(new ReservationInterval(0, diff % period), alloc);
|
||||
} else {
|
||||
rle.addInterval(new ReservationInterval(0, diff), alloc);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
rle.addInterval(new ReservationInterval(rStart, rEnd), alloc);
|
||||
return rle;
|
||||
}
|
||||
|
||||
public abstract SharingPolicy getInitializedPolicy();
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
/*******************************************************************************
|
||||
/******************************************************************************
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -14,7 +14,7 @@
|
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*******************************************************************************/
|
||||
*****************************************************************************/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
|
||||
|
||||
import static org.mockito.Matchers.any;
|
||||
|
@ -466,4 +466,28 @@ public class ReservationSystemTestUtil {
|
|||
public static Resource calculateClusterResource(int numContainers) {
|
||||
return Resource.newInstance(numContainers * 1024, numContainers);
|
||||
}
|
||||
|
||||
|
||||
public static Map<ReservationInterval, Resource> toAllocation(
|
||||
RLESparseResourceAllocation rle, long start, long end) {
|
||||
Map<ReservationInterval, Resource> resAlloc = new TreeMap<>();
|
||||
|
||||
for (Map.Entry<Long, Resource> e : rle.getCumulative().entrySet()) {
|
||||
Long nextKey = rle.getCumulative().higherKey(e.getKey());
|
||||
if (nextKey == null) {
|
||||
break;
|
||||
} else {
|
||||
if (e.getKey() >= start && e.getKey() <= end && nextKey >= start
|
||||
&& nextKey <= end) {
|
||||
resAlloc.put(new ReservationInterval(e.getKey(), nextKey),
|
||||
e.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return resAlloc;
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -6,9 +6,9 @@
|
|||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
|
@ -17,269 +17,118 @@
|
|||
*******************************************************************************/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import net.jcip.annotations.NotThreadSafe;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
public class TestCapacityOverTimePolicy {
|
||||
/**
|
||||
* This class tests the {@code CapacityOvertimePolicy} sharing policy.
|
||||
*/
|
||||
@RunWith(value = Parameterized.class)
|
||||
@NotThreadSafe
|
||||
@SuppressWarnings("VisibilityModifier")
|
||||
public class TestCapacityOverTimePolicy extends BaseSharingPolicyTest {
|
||||
|
||||
long timeWindow;
|
||||
long step;
|
||||
float avgConstraint;
|
||||
float instConstraint;
|
||||
long initTime;
|
||||
final static long ONEDAY = 86400 * 1000;
|
||||
final static long ONEHOUR = 3600 * 1000;
|
||||
final static long ONEMINUTE = 60 * 1000;
|
||||
final static String TWODAYPERIOD = "7200000";
|
||||
final static String ONEDAYPERIOD = "86400000";
|
||||
|
||||
InMemoryPlan plan;
|
||||
ReservationAgent mAgent;
|
||||
Resource minAlloc;
|
||||
ResourceCalculator res;
|
||||
Resource maxAlloc;
|
||||
@Parameterized.Parameters(name = "Duration {0}, height {1}," +
|
||||
" submission {2}, periodic {3})")
|
||||
public static Collection<Object[]> data() {
|
||||
return Arrays.asList(new Object[][] {
|
||||
|
||||
int totCont = 1000000;
|
||||
// easy fit
|
||||
{ONEHOUR, 0.25, 1, null, null },
|
||||
{ONEHOUR, 0.25, 1, TWODAYPERIOD, null },
|
||||
{ONEHOUR, 0.25, 1, ONEDAYPERIOD, null },
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
// instantaneous high, but fit integral and inst limits
|
||||
{ONEMINUTE, 0.74, 1, null, null },
|
||||
{ONEMINUTE, 0.74, 1, TWODAYPERIOD, null },
|
||||
{ONEMINUTE, 0.74, 1, ONEDAYPERIOD, null },
|
||||
|
||||
// barely fit
|
||||
{ONEHOUR, 0.76, 1, null, PlanningQuotaException.class },
|
||||
{ONEHOUR, 0.76, 1, TWODAYPERIOD, PlanningQuotaException.class },
|
||||
{ONEHOUR, 0.76, 1, ONEDAYPERIOD, PlanningQuotaException.class },
|
||||
|
||||
// overcommit with single reservation
|
||||
{ONEHOUR, 1.1, 1, null, PlanningQuotaException.class },
|
||||
{ONEHOUR, 1.1, 1, TWODAYPERIOD, PlanningQuotaException.class },
|
||||
{ONEHOUR, 1.1, 1, ONEDAYPERIOD, PlanningQuotaException.class },
|
||||
|
||||
// barely fit with multiple reservations (instantaneously, lowering to
|
||||
// 1min to fit integral)
|
||||
{ONEMINUTE, 0.25, 3, null, null },
|
||||
{ONEMINUTE, 0.25, 3, TWODAYPERIOD, null },
|
||||
{ONEMINUTE, 0.25, 3, ONEDAYPERIOD, null },
|
||||
|
||||
// overcommit with multiple reservations (instantaneously)
|
||||
{ONEMINUTE, 0.25, 4, null, PlanningQuotaException.class },
|
||||
{ONEMINUTE, 0.25, 4, TWODAYPERIOD, PlanningQuotaException.class },
|
||||
{ONEMINUTE, 0.25, 4, ONEDAYPERIOD, PlanningQuotaException.class },
|
||||
|
||||
// (non-periodic) reservation longer than window
|
||||
{25 * ONEHOUR, 0.25, 1, null, PlanningQuotaException.class },
|
||||
{25 * ONEHOUR, 0.25, 1, TWODAYPERIOD, PlanningQuotaException.class },
|
||||
{25 * ONEHOUR, 0.25, 1, ONEDAYPERIOD, PlanningQuotaException.class },
|
||||
|
||||
// (non-periodic) reservation longer than window
|
||||
{25 * ONEHOUR, 0.05, 5, null, PlanningQuotaException.class },
|
||||
{25 * ONEHOUR, 0.05, 5, TWODAYPERIOD, PlanningQuotaException.class },
|
||||
{25 * ONEHOUR, 0.05, 5, ONEDAYPERIOD, PlanningQuotaException.class },
|
||||
|
||||
// overcommit integral
|
||||
{ONEDAY, 0.26, 1, null, PlanningQuotaException.class },
|
||||
{2 * ONEHOUR, 0.26, 1, TWODAYPERIOD, PlanningQuotaException.class },
|
||||
{2 * ONEDAY, 0.26, 1, ONEDAYPERIOD, PlanningQuotaException.class },
|
||||
|
||||
// overcommit integral
|
||||
{ONEDAY / 2, 0.51, 1, null, PlanningQuotaException.class },
|
||||
{2 * ONEHOUR / 2, 0.51, 1, TWODAYPERIOD,
|
||||
PlanningQuotaException.class },
|
||||
{2 * ONEDAY / 2, 0.51, 1, ONEDAYPERIOD, PlanningQuotaException.class }
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public SharingPolicy getInitializedPolicy() {
|
||||
|
||||
// 24h window
|
||||
timeWindow = 86400000L;
|
||||
long timeWindow = 86400000L;
|
||||
|
||||
// 1 sec step
|
||||
step = 1000L;
|
||||
long step = 1000L;
|
||||
|
||||
// 25% avg cap on capacity
|
||||
avgConstraint = 25;
|
||||
float avgConstraint = 25;
|
||||
|
||||
// 70% instantaneous cap on capacity
|
||||
instConstraint = 70;
|
||||
float instConstraint = 75;
|
||||
|
||||
initTime = System.currentTimeMillis();
|
||||
minAlloc = Resource.newInstance(1024, 1);
|
||||
res = new DefaultResourceCalculator();
|
||||
maxAlloc = Resource.newInstance(1024 * 8, 8);
|
||||
|
||||
mAgent = mock(ReservationAgent.class);
|
||||
QueueMetrics rootQueueMetrics = mock(QueueMetrics.class);
|
||||
String reservationQ =
|
||||
ReservationSystemTestUtil.getFullReservationQueueName();
|
||||
Resource clusterResource =
|
||||
ReservationSystemTestUtil.calculateClusterResource(totCont);
|
||||
ReservationSchedulerConfiguration conf =
|
||||
ReservationSystemTestUtil.createConf(reservationQ, timeWindow,
|
||||
instConstraint, avgConstraint);
|
||||
conf = ReservationSystemTestUtil.createConf(reservationQ, timeWindow,
|
||||
instConstraint, avgConstraint);
|
||||
CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
|
||||
policy.init(reservationQ, conf);
|
||||
RMContext context = ReservationSystemTestUtil.createMockRMContext();
|
||||
|
||||
plan =
|
||||
new InMemoryPlan(rootQueueMetrics, policy, mAgent,
|
||||
clusterResource, step, res, minAlloc, maxAlloc,
|
||||
"dedicated", null, true, context);
|
||||
}
|
||||
|
||||
public int[] generateData(int length, int val) {
|
||||
int[] data = new int[length];
|
||||
for (int i = 0; i < length; i++) {
|
||||
data[i] = val;
|
||||
}
|
||||
return data;
|
||||
return policy;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimplePass() throws IOException, PlanningException {
|
||||
// generate allocation that simply fit within all constraints
|
||||
int[] f = generateData(3600, (int) Math.ceil(0.2 * totCont));
|
||||
|
||||
ReservationDefinition rDef =
|
||||
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||
initTime, initTime + f.length + 1, f.length);
|
||||
assertTrue(plan.toString(),
|
||||
plan.addReservation(new InMemoryReservationAllocation(
|
||||
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
|
||||
"dedicated", initTime, initTime + f.length,
|
||||
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||
res, minAlloc), false));
|
||||
public void testAllocation() throws IOException, PlanningException {
|
||||
runTest();
|
||||
}
|
||||
|
||||
@Test(expected = PlanningException.class)
|
||||
public void testAllocationLargerThanValidWindow() throws IOException,
|
||||
PlanningException {
|
||||
// generate allocation that exceed the validWindow
|
||||
int[] f = generateData(25*3600, (int) Math.ceil(0.69 * totCont));
|
||||
|
||||
ReservationDefinition rDef =
|
||||
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||
initTime, initTime + f.length + 1, f.length);
|
||||
assertTrue(plan.toString(),
|
||||
plan.addReservation(new InMemoryReservationAllocation(
|
||||
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
|
||||
"dedicated", initTime, initTime + f.length,
|
||||
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||
res, minAlloc), false));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimplePass2() throws IOException, PlanningException {
|
||||
// generate allocation from single tenant that exceed avg momentarily but
|
||||
// fit within
|
||||
// max instantanesou
|
||||
int[] f = generateData(3600, (int) Math.ceil(0.69 * totCont));
|
||||
ReservationDefinition rDef =
|
||||
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||
initTime, initTime + f.length + 1, f.length);
|
||||
assertTrue(plan.toString(),
|
||||
plan.addReservation(new InMemoryReservationAllocation(
|
||||
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
|
||||
"dedicated", initTime, initTime + f.length,
|
||||
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||
res, minAlloc), false));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiTenantPass() throws IOException, PlanningException {
|
||||
// generate allocation from multiple tenants that barely fit in tot capacity
|
||||
int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
|
||||
ReservationDefinition rDef =
|
||||
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||
initTime, initTime + f.length + 1, f.length);
|
||||
for (int i = 0; i < 4; i++) {
|
||||
assertTrue(plan.toString(),
|
||||
plan.addReservation(new InMemoryReservationAllocation(
|
||||
ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
|
||||
"dedicated", initTime, initTime + f.length,
|
||||
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||
res, minAlloc), false));
|
||||
}
|
||||
}
|
||||
|
||||
@Test(expected = PlanningQuotaException.class)
|
||||
public void testMultiTenantFail() throws IOException, PlanningException {
|
||||
// generate allocation from multiple tenants that exceed tot capacity
|
||||
int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
|
||||
ReservationDefinition rDef =
|
||||
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||
initTime, initTime + f.length + 1, f.length);
|
||||
for (int i = 0; i < 5; i++) {
|
||||
assertTrue(plan.toString(),
|
||||
plan.addReservation(new InMemoryReservationAllocation(
|
||||
ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
|
||||
"dedicated", initTime, initTime + f.length,
|
||||
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||
res, minAlloc), false));
|
||||
}
|
||||
}
|
||||
|
||||
@Test(expected = PlanningQuotaException.class)
|
||||
public void testInstFail() throws IOException, PlanningException {
|
||||
// generate allocation that exceed the instantaneous cap single-show
|
||||
int[] f = generateData(3600, (int) Math.ceil(0.71 * totCont));
|
||||
ReservationDefinition rDef =
|
||||
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||
initTime, initTime + f.length + 1, f.length);
|
||||
assertTrue(plan.toString(),
|
||||
plan.addReservation(new InMemoryReservationAllocation(
|
||||
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
|
||||
"dedicated", initTime, initTime + f.length,
|
||||
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||
res, minAlloc), false));
|
||||
Assert.fail("should not have accepted this");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInstFailBySum() throws IOException, PlanningException {
|
||||
// generate allocation that exceed the instantaneous cap by sum
|
||||
int[] f = generateData(3600, (int) Math.ceil(0.3 * totCont));
|
||||
ReservationDefinition rDef =
|
||||
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||
initTime, initTime + f.length + 1, f.length);
|
||||
assertTrue(plan.toString(),
|
||||
plan.addReservation(new InMemoryReservationAllocation(
|
||||
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
|
||||
"dedicated", initTime, initTime + f.length,
|
||||
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||
res, minAlloc), false));
|
||||
assertTrue(plan.toString(),
|
||||
plan.addReservation(new InMemoryReservationAllocation(
|
||||
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
|
||||
"dedicated", initTime, initTime + f.length,
|
||||
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||
res, minAlloc), false));
|
||||
try {
|
||||
assertTrue(plan.toString(),
|
||||
plan.addReservation(new InMemoryReservationAllocation(
|
||||
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
|
||||
"dedicated", initTime, initTime + f.length,
|
||||
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||
res, minAlloc), false));
|
||||
Assert.fail();
|
||||
} catch (PlanningQuotaException p) {
|
||||
// expected
|
||||
}
|
||||
}
|
||||
|
||||
@Test(expected = PlanningQuotaException.class)
|
||||
public void testFailAvg() throws IOException, PlanningException {
|
||||
// generate an allocation which violates the 25% average single-shot
|
||||
Map<ReservationInterval, Resource> req =
|
||||
new TreeMap<ReservationInterval, Resource>();
|
||||
long win = timeWindow / 2 + 100;
|
||||
int cont = (int) Math.ceil(0.5 * totCont);
|
||||
req.put(new ReservationInterval(initTime, initTime + win),
|
||||
ReservationSystemUtil.toResource(
|
||||
ReservationRequest.newInstance(Resource.newInstance(1024, 1),
|
||||
cont)));
|
||||
ReservationDefinition rDef =
|
||||
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||
initTime, initTime + win, win);
|
||||
assertTrue(plan.toString(),
|
||||
plan.addReservation(new InMemoryReservationAllocation(
|
||||
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
|
||||
"dedicated", initTime, initTime + win, req, res, minAlloc), false));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailAvgBySum() throws IOException, PlanningException {
|
||||
// generate an allocation which violates the 25% average by sum
|
||||
Map<ReservationInterval, Resource> req =
|
||||
new TreeMap<ReservationInterval, Resource>();
|
||||
long win = 86400000 / 4 + 1;
|
||||
int cont = (int) Math.ceil(0.5 * totCont);
|
||||
req.put(new ReservationInterval(initTime, initTime + win),
|
||||
ReservationSystemUtil.toResource(ReservationRequest.newInstance(Resource
|
||||
.newInstance(1024, 1), cont)));
|
||||
ReservationDefinition rDef =
|
||||
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||
initTime, initTime + win, win);
|
||||
assertTrue(plan.toString(),
|
||||
plan.addReservation(new InMemoryReservationAllocation(
|
||||
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
|
||||
"dedicated", initTime, initTime + win, req, res, minAlloc), false));
|
||||
|
||||
try {
|
||||
assertTrue(plan.toString(),
|
||||
plan.addReservation(new InMemoryReservationAllocation(
|
||||
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
|
||||
"dedicated", initTime, initTime + win, req, res, minAlloc), false));
|
||||
|
||||
Assert.fail("should not have accepted this");
|
||||
} catch (PlanningQuotaException e) {
|
||||
// expected
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -6,9 +6,9 @@
|
|||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
|
@ -17,145 +17,70 @@
|
|||
*******************************************************************************/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import net.jcip.annotations.NotThreadSafe;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.junit.Before;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
public class TestNoOverCommitPolicy {
|
||||
/**
|
||||
* This clas tests {@code NoOverCommitPolicy} sharing policy.
|
||||
*/
|
||||
@RunWith(value = Parameterized.class)
|
||||
@NotThreadSafe
|
||||
@SuppressWarnings("VisibilityModifier")
|
||||
public class TestNoOverCommitPolicy extends BaseSharingPolicyTest {
|
||||
|
||||
long step;
|
||||
long initTime;
|
||||
final static long ONEHOUR = 3600 * 1000;
|
||||
final static String TWOHOURPERIOD = "7200000";
|
||||
|
||||
InMemoryPlan plan;
|
||||
ReservationAgent mAgent;
|
||||
Resource minAlloc;
|
||||
ResourceCalculator res;
|
||||
Resource maxAlloc;
|
||||
@Parameterized.Parameters(name = "Duration {0}, height {1}," +
|
||||
" submissions {2}, periodic {3})")
|
||||
public static Collection<Object[]> data() {
|
||||
return Arrays.asList(new Object[][] {
|
||||
|
||||
int totCont = 1000000;
|
||||
// easy fit
|
||||
{ONEHOUR, 0.25, 1, null, null },
|
||||
{ONEHOUR, 0.25, 1, TWOHOURPERIOD, null },
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
// barely fit
|
||||
{ONEHOUR, 1, 1, null, null },
|
||||
{ONEHOUR, 1, 1, TWOHOURPERIOD, null },
|
||||
|
||||
// 1 sec step
|
||||
step = 1000L;
|
||||
// overcommit with single reservation
|
||||
{ONEHOUR, 1.1, 1, null, ResourceOverCommitException.class },
|
||||
{ONEHOUR, 1.1, 1, TWOHOURPERIOD, ResourceOverCommitException.class },
|
||||
|
||||
initTime = System.currentTimeMillis();
|
||||
minAlloc = Resource.newInstance(1024, 1);
|
||||
res = new DefaultResourceCalculator();
|
||||
maxAlloc = Resource.newInstance(1024 * 8, 8);
|
||||
// barely fit with multiple reservations
|
||||
{ONEHOUR, 0.25, 4, null, null },
|
||||
{ONEHOUR, 0.25, 4, TWOHOURPERIOD, null },
|
||||
|
||||
mAgent = mock(ReservationAgent.class);
|
||||
// overcommit with multiple reservations
|
||||
{ONEHOUR, 0.25, 5, null, ResourceOverCommitException.class },
|
||||
{ONEHOUR, 0.25, 5, TWOHOURPERIOD, ResourceOverCommitException.class }
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public SharingPolicy getInitializedPolicy() {
|
||||
String reservationQ =
|
||||
ReservationSystemTestUtil.getFullReservationQueueName();
|
||||
QueueMetrics rootQueueMetrics = mock(QueueMetrics.class);
|
||||
Resource clusterResource =
|
||||
ReservationSystemTestUtil.calculateClusterResource(totCont);
|
||||
ReservationSchedulerConfiguration conf = mock
|
||||
(ReservationSchedulerConfiguration.class);
|
||||
NoOverCommitPolicy policy = new NoOverCommitPolicy();
|
||||
conf = new CapacitySchedulerConfiguration();
|
||||
SharingPolicy policy = new NoOverCommitPolicy();
|
||||
policy.init(reservationQ, conf);
|
||||
RMContext context = ReservationSystemTestUtil.createMockRMContext();
|
||||
|
||||
plan =
|
||||
new InMemoryPlan(rootQueueMetrics, policy, mAgent,
|
||||
clusterResource, step, res, minAlloc, maxAlloc,
|
||||
"dedicated", null, true, context);
|
||||
}
|
||||
|
||||
public int[] generateData(int length, int val) {
|
||||
int[] data = new int[length];
|
||||
for (int i = 0; i < length; i++) {
|
||||
data[i] = val;
|
||||
}
|
||||
return data;
|
||||
return policy;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSingleUserEasyFitPass() throws IOException, PlanningException {
|
||||
// generate allocation that easily fit within resource constraints
|
||||
int[] f = generateData(3600, (int) Math.ceil(0.2 * totCont));
|
||||
ReservationDefinition rDef =
|
||||
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||
initTime, initTime + f.length + 1, f.length);
|
||||
assertTrue(plan.toString(),
|
||||
plan.addReservation(new InMemoryReservationAllocation(
|
||||
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
|
||||
"dedicated", initTime, initTime + f.length,
|
||||
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||
res, minAlloc), false));
|
||||
public void testAllocation() throws IOException, PlanningException {
|
||||
runTest();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSingleUserBarelyFitPass() throws IOException,
|
||||
PlanningException {
|
||||
// generate allocation from single tenant that barely fit
|
||||
int[] f = generateData(3600, totCont);
|
||||
ReservationDefinition rDef =
|
||||
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||
initTime, initTime + f.length + 1, f.length);
|
||||
assertTrue(plan.toString(),
|
||||
plan.addReservation(new InMemoryReservationAllocation(
|
||||
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
|
||||
"dedicated", initTime, initTime + f.length,
|
||||
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||
res, minAlloc), false));
|
||||
}
|
||||
|
||||
@Test(expected = ResourceOverCommitException.class)
|
||||
public void testSingleFail() throws IOException, PlanningException {
|
||||
// generate allocation from single tenant that exceed capacity
|
||||
int[] f = generateData(3600, (int) (1.1 * totCont));
|
||||
plan.addReservation(new InMemoryReservationAllocation(
|
||||
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
|
||||
"dedicated", initTime, initTime + f.length, ReservationSystemTestUtil
|
||||
.generateAllocation(initTime, step, f), res, minAlloc), false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiTenantPass() throws IOException, PlanningException {
|
||||
// generate allocation from multiple tenants that barely fit in tot capacity
|
||||
int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
|
||||
ReservationDefinition rDef =
|
||||
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||
initTime, initTime + f.length + 1, f.length);
|
||||
for (int i = 0; i < 4; i++) {
|
||||
assertTrue(plan.toString(),
|
||||
plan.addReservation(new InMemoryReservationAllocation(
|
||||
ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
|
||||
"dedicated", initTime, initTime + f.length,
|
||||
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||
res, minAlloc), false));
|
||||
}
|
||||
}
|
||||
|
||||
@Test(expected = ResourceOverCommitException.class)
|
||||
public void testMultiTenantFail() throws IOException, PlanningException {
|
||||
// generate allocation from multiple tenants that exceed tot capacity
|
||||
int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
|
||||
ReservationDefinition rDef =
|
||||
ReservationSystemTestUtil.createSimpleReservationDefinition(
|
||||
initTime, initTime + f.length + 1, f.length);
|
||||
for (int i = 0; i < 5; i++) {
|
||||
assertTrue(plan.toString(),
|
||||
plan.addReservation(new InMemoryReservationAllocation(
|
||||
ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
|
||||
"dedicated", initTime, initTime + f.length,
|
||||
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||
res, minAlloc), false));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue