YARN-5330. SharingPolicy enhancements required to support recurring reservations in ReservationSystem. (Carlo Curino via Subru).

(cherry picked from commit fa6137501c)
This commit is contained in:
Subru Krishnan 2017-09-07 19:07:17 -07:00
parent eb0a00c9b5
commit 8d59b1fbee
6 changed files with 377 additions and 382 deletions

View File

@ -95,26 +95,29 @@ public class CapacityOverTimePolicy extends NoOverCommitPolicy {
throw new PlanningQuotaException(p); throw new PlanningQuotaException(p);
} }
long checkStart = reservation.getStartTime() - validWindow;
long checkEnd = reservation.getEndTime() + validWindow;
//---- check for integral violations of capacity -------- //---- check for integral violations of capacity --------
// Gather a view of what to check (curr allocation of user, minus old // Gather a view of what to check (curr allocation of user, minus old
// version of this reservation, plus new version) // version of this reservation, plus new version)
RLESparseResourceAllocation consumptionForUserOverTime = RLESparseResourceAllocation consumptionForUserOverTime =
plan.getConsumptionForUserOverTime(reservation.getUser(), plan.getConsumptionForUserOverTime(reservation.getUser(),
reservation.getStartTime() - validWindow, checkStart, checkEnd);
reservation.getEndTime() + validWindow);
ReservationAllocation old = ReservationAllocation old =
plan.getReservationById(reservation.getReservationId()); plan.getReservationById(reservation.getReservationId());
if (old != null) { if (old != null) {
consumptionForUserOverTime = RLESparseResourceAllocation consumptionForUserOverTime =
.merge(plan.getResourceCalculator(), plan.getTotalCapacity(), RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
consumptionForUserOverTime, old.getResourcesOverTime(), plan.getTotalCapacity(), consumptionForUserOverTime,
RLEOperator.add, reservation.getStartTime() - validWindow, old.getResourcesOverTime(checkStart, checkEnd), RLEOperator.add,
reservation.getEndTime() + validWindow); checkStart, checkEnd);
} }
RLESparseResourceAllocation resRLE = reservation.getResourcesOverTime(); RLESparseResourceAllocation resRLE =
reservation.getResourcesOverTime(checkStart, checkEnd);
RLESparseResourceAllocation toCheck = RLESparseResourceAllocation RLESparseResourceAllocation toCheck = RLESparseResourceAllocation
.merge(plan.getResourceCalculator(), plan.getTotalCapacity(), .merge(plan.getResourceCalculator(), plan.getTotalCapacity(),
@ -191,11 +194,11 @@ public class CapacityOverTimePolicy extends NoOverCommitPolicy {
// compare using merge() limit with integral // compare using merge() limit with integral
try { try {
RLESparseResourceAllocation
.merge(plan.getResourceCalculator(), plan.getTotalCapacity(), RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
targetLimit, integral, RLEOperator.subtractTestNonNegative, plan.getTotalCapacity(), targetLimit, integral,
reservation.getStartTime() - validWindow, RLEOperator.subtractTestNonNegative, checkStart, checkEnd);
reservation.getEndTime() + validWindow);
} catch (PlanningException p) { } catch (PlanningException p) {
throw new PlanningQuotaException( throw new PlanningQuotaException(
"Integral (avg over time) quota capacity " + maxAvg "Integral (avg over time) quota capacity " + maxAvg
@ -240,7 +243,8 @@ public class CapacityOverTimePolicy extends NoOverCommitPolicy {
if (old != null) { if (old != null) {
used = RLESparseResourceAllocation.merge(plan.getResourceCalculator(), used = RLESparseResourceAllocation.merge(plan.getResourceCalculator(),
Resources.clone(plan.getTotalCapacity()), used, Resources.clone(plan.getTotalCapacity()), used,
old.getResourcesOverTime(), RLEOperator.subtract, start, end); old.getResourcesOverTime(start, end), RLEOperator.subtract, start,
end);
} }
instRLEQuota = RLESparseResourceAllocation instRLEQuota = RLESparseResourceAllocation

View File

@ -40,13 +40,17 @@ public class NoOverCommitPolicy implements SharingPolicy {
RLESparseResourceAllocation available = plan.getAvailableResourceOverTime( RLESparseResourceAllocation available = plan.getAvailableResourceOverTime(
reservation.getUser(), reservation.getReservationId(), reservation.getUser(), reservation.getReservationId(),
reservation.getStartTime(), reservation.getEndTime(), 0); reservation.getStartTime(), reservation.getEndTime(),
reservation.getPeriodicity());
// test the reservation does not exceed what is available // test the reservation does not exceed what is available
try { try {
RLESparseResourceAllocation ask = reservation.getResourcesOverTime(
reservation.getStartTime(), reservation.getEndTime());
RLESparseResourceAllocation RLESparseResourceAllocation
.merge(plan.getResourceCalculator(), plan.getTotalCapacity(), .merge(plan.getResourceCalculator(), plan.getTotalCapacity(),
available, reservation.getResourcesOverTime(), available, ask,
RLESparseResourceAllocation.RLEOperator.subtractTestNonNegative, RLESparseResourceAllocation.RLEOperator.subtractTestNonNegative,
reservation.getStartTime(), reservation.getEndTime()); reservation.getStartTime(), reservation.getEndTime());
} catch (PlanningException p) { } catch (PlanningException p) {

View File

@ -0,0 +1,189 @@
/*******************************************************************************
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*******************************************************************************/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import static junit.framework.TestCase.fail;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import net.jcip.annotations.NotThreadSafe;
/**
* This class is a base test for {@code SharingPolicy} implementors.
*/
@RunWith(value = Parameterized.class)
@NotThreadSafe
@SuppressWarnings("VisibilityModifier")
public abstract class BaseSharingPolicyTest {
@Parameterized.Parameter(value = 0)
public long duration;
@Parameterized.Parameter(value = 1)
public double height;
@Parameterized.Parameter(value = 2)
public int numSubmissions;
@Parameterized.Parameter(value = 3)
public String recurrenceExpression;
@Parameterized.Parameter(value = 4)
public Class expectedError;
private long step;
private long initTime;
private InMemoryPlan plan;
private ReservationAgent mAgent;
private Resource minAlloc;
private ResourceCalculator res;
private Resource maxAlloc;
private int totCont = 1000;
protected ReservationSchedulerConfiguration conf;
@Before
public void setup() {
// 1 sec step
step = 1000L;
initTime = System.currentTimeMillis();
minAlloc = Resource.newInstance(1024, 1);
res = new DefaultResourceCalculator();
maxAlloc = Resource.newInstance(1024 * 8, 8);
mAgent = mock(ReservationAgent.class);
QueueMetrics rootQueueMetrics = mock(QueueMetrics.class);
Resource clusterResource =
ReservationSystemTestUtil.calculateClusterResource(totCont);
// invoke implementors initialization of policy
SharingPolicy policy = getInitializedPolicy();
RMContext context = ReservationSystemTestUtil.createMockRMContext();
plan = new InMemoryPlan(rootQueueMetrics, policy, mAgent, clusterResource,
step, res, minAlloc, maxAlloc, "dedicated", null, true, context);
}
public void runTest() throws IOException, PlanningException {
long period = 1;
if (recurrenceExpression != null) {
period = Long.parseLong(recurrenceExpression);
}
try {
RLESparseResourceAllocation rle = generateRLEAlloc(period);
// Generate the intervalMap (trimming out-of-period entries)
Map<ReservationInterval, Resource> reservationIntervalResourceMap;
if (period > 1) {
rle = new PeriodicRLESparseResourceAllocation(rle, period);
reservationIntervalResourceMap =
ReservationSystemTestUtil.toAllocation(rle, 0, period);
} else {
reservationIntervalResourceMap = ReservationSystemTestUtil
.toAllocation(rle, Long.MIN_VALUE, Long.MAX_VALUE);
}
ReservationDefinition rDef =
ReservationSystemTestUtil.createSimpleReservationDefinition(
initTime % period, initTime % period + duration + 1, duration, 1,
recurrenceExpression);
// perform multiple submissions where required
for (int i = 0; i < numSubmissions; i++) {
long rstart = rle.getEarliestStartTime();
long rend = rle.getLatestNonNullTime();
InMemoryReservationAllocation resAlloc =
new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
"dedicated", rstart, rend, reservationIntervalResourceMap, res,
minAlloc);
assertTrue(plan.toString(), plan.addReservation(resAlloc, false));
}
// fail if error was expected
if (expectedError != null) {
System.out.println(plan.toString());
fail();
}
} catch (Exception e) {
if (expectedError == null || !e.getClass().getCanonicalName()
.equals(expectedError.getCanonicalName())) {
// fail on unexpected errors
throw e;
}
}
}
private RLESparseResourceAllocation generateRLEAlloc(long period) {
RLESparseResourceAllocation rle =
new RLESparseResourceAllocation(new DefaultResourceCalculator());
Resource alloc = Resources.multiply(minAlloc, height * totCont);
// loop in case the periodicity of the reservation is smaller than LCM
long rStart = initTime % period;
long rEnd = initTime % period + duration;
// handle wrap-around
if (period > 1 && rEnd > period) {
long diff = rEnd - period;
rEnd = period;
// handle multiple wrap-arounds (e.g., 5h duration on a 2h periodicity)
if(duration > period) {
rle.addInterval(new ReservationInterval(0, period),
Resources.multiply(alloc, duration / period - 1));
rle.addInterval(new ReservationInterval(0, diff % period), alloc);
} else {
rle.addInterval(new ReservationInterval(0, diff), alloc);
}
}
rle.addInterval(new ReservationInterval(rStart, rEnd), alloc);
return rle;
}
public abstract SharingPolicy getInitializedPolicy();
}

View File

@ -1,4 +1,4 @@
/******************************************************************************* /******************************************************************************
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -14,7 +14,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*******************************************************************************/ *****************************************************************************/
package org.apache.hadoop.yarn.server.resourcemanager.reservation; package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
@ -466,4 +466,28 @@ public class ReservationSystemTestUtil {
public static Resource calculateClusterResource(int numContainers) { public static Resource calculateClusterResource(int numContainers) {
return Resource.newInstance(numContainers * 1024, numContainers); return Resource.newInstance(numContainers * 1024, numContainers);
} }
public static Map<ReservationInterval, Resource> toAllocation(
RLESparseResourceAllocation rle, long start, long end) {
Map<ReservationInterval, Resource> resAlloc = new TreeMap<>();
for (Map.Entry<Long, Resource> e : rle.getCumulative().entrySet()) {
Long nextKey = rle.getCumulative().higherKey(e.getKey());
if (nextKey == null) {
break;
} else {
if (e.getKey() >= start && e.getKey() <= end && nextKey >= start
&& nextKey <= end) {
resAlloc.put(new ReservationInterval(e.getKey(), nextKey),
e.getValue());
}
}
}
return resAlloc;
}
} }

View File

@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the * to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -17,269 +17,118 @@
*******************************************************************************/ *******************************************************************************/
package org.apache.hadoop.yarn.server.resourcemanager.reservation; package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Arrays;
import java.util.TreeMap; import java.util.Collection;
import org.apache.hadoop.yarn.api.records.ReservationDefinition; import net.jcip.annotations.NotThreadSafe;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
public class TestCapacityOverTimePolicy { /**
* This class tests the {@code CapacityOvertimePolicy} sharing policy.
*/
@RunWith(value = Parameterized.class)
@NotThreadSafe
@SuppressWarnings("VisibilityModifier")
public class TestCapacityOverTimePolicy extends BaseSharingPolicyTest {
long timeWindow; final static long ONEDAY = 86400 * 1000;
long step; final static long ONEHOUR = 3600 * 1000;
float avgConstraint; final static long ONEMINUTE = 60 * 1000;
float instConstraint; final static String TWODAYPERIOD = "7200000";
long initTime; final static String ONEDAYPERIOD = "86400000";
InMemoryPlan plan; @Parameterized.Parameters(name = "Duration {0}, height {1}," +
ReservationAgent mAgent; " submission {2}, periodic {3})")
Resource minAlloc; public static Collection<Object[]> data() {
ResourceCalculator res; return Arrays.asList(new Object[][] {
Resource maxAlloc;
int totCont = 1000000; // easy fit
{ONEHOUR, 0.25, 1, null, null },
{ONEHOUR, 0.25, 1, TWODAYPERIOD, null },
{ONEHOUR, 0.25, 1, ONEDAYPERIOD, null },
@Before // instantaneous high, but fit integral and inst limits
public void setup() throws Exception { {ONEMINUTE, 0.74, 1, null, null },
{ONEMINUTE, 0.74, 1, TWODAYPERIOD, null },
{ONEMINUTE, 0.74, 1, ONEDAYPERIOD, null },
// barely fit
{ONEHOUR, 0.76, 1, null, PlanningQuotaException.class },
{ONEHOUR, 0.76, 1, TWODAYPERIOD, PlanningQuotaException.class },
{ONEHOUR, 0.76, 1, ONEDAYPERIOD, PlanningQuotaException.class },
// overcommit with single reservation
{ONEHOUR, 1.1, 1, null, PlanningQuotaException.class },
{ONEHOUR, 1.1, 1, TWODAYPERIOD, PlanningQuotaException.class },
{ONEHOUR, 1.1, 1, ONEDAYPERIOD, PlanningQuotaException.class },
// barely fit with multiple reservations (instantaneously, lowering to
// 1min to fit integral)
{ONEMINUTE, 0.25, 3, null, null },
{ONEMINUTE, 0.25, 3, TWODAYPERIOD, null },
{ONEMINUTE, 0.25, 3, ONEDAYPERIOD, null },
// overcommit with multiple reservations (instantaneously)
{ONEMINUTE, 0.25, 4, null, PlanningQuotaException.class },
{ONEMINUTE, 0.25, 4, TWODAYPERIOD, PlanningQuotaException.class },
{ONEMINUTE, 0.25, 4, ONEDAYPERIOD, PlanningQuotaException.class },
// (non-periodic) reservation longer than window
{25 * ONEHOUR, 0.25, 1, null, PlanningQuotaException.class },
{25 * ONEHOUR, 0.25, 1, TWODAYPERIOD, PlanningQuotaException.class },
{25 * ONEHOUR, 0.25, 1, ONEDAYPERIOD, PlanningQuotaException.class },
// (non-periodic) reservation longer than window
{25 * ONEHOUR, 0.05, 5, null, PlanningQuotaException.class },
{25 * ONEHOUR, 0.05, 5, TWODAYPERIOD, PlanningQuotaException.class },
{25 * ONEHOUR, 0.05, 5, ONEDAYPERIOD, PlanningQuotaException.class },
// overcommit integral
{ONEDAY, 0.26, 1, null, PlanningQuotaException.class },
{2 * ONEHOUR, 0.26, 1, TWODAYPERIOD, PlanningQuotaException.class },
{2 * ONEDAY, 0.26, 1, ONEDAYPERIOD, PlanningQuotaException.class },
// overcommit integral
{ONEDAY / 2, 0.51, 1, null, PlanningQuotaException.class },
{2 * ONEHOUR / 2, 0.51, 1, TWODAYPERIOD,
PlanningQuotaException.class },
{2 * ONEDAY / 2, 0.51, 1, ONEDAYPERIOD, PlanningQuotaException.class }
});
}
@Override
public SharingPolicy getInitializedPolicy() {
// 24h window // 24h window
timeWindow = 86400000L; long timeWindow = 86400000L;
// 1 sec step // 1 sec step
step = 1000L; long step = 1000L;
// 25% avg cap on capacity // 25% avg cap on capacity
avgConstraint = 25; float avgConstraint = 25;
// 70% instantaneous cap on capacity // 70% instantaneous cap on capacity
instConstraint = 70; float instConstraint = 75;
initTime = System.currentTimeMillis();
minAlloc = Resource.newInstance(1024, 1);
res = new DefaultResourceCalculator();
maxAlloc = Resource.newInstance(1024 * 8, 8);
mAgent = mock(ReservationAgent.class);
QueueMetrics rootQueueMetrics = mock(QueueMetrics.class);
String reservationQ = String reservationQ =
ReservationSystemTestUtil.getFullReservationQueueName(); ReservationSystemTestUtil.getFullReservationQueueName();
Resource clusterResource = conf = ReservationSystemTestUtil.createConf(reservationQ, timeWindow,
ReservationSystemTestUtil.calculateClusterResource(totCont); instConstraint, avgConstraint);
ReservationSchedulerConfiguration conf =
ReservationSystemTestUtil.createConf(reservationQ, timeWindow,
instConstraint, avgConstraint);
CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
policy.init(reservationQ, conf); policy.init(reservationQ, conf);
RMContext context = ReservationSystemTestUtil.createMockRMContext(); return policy;
plan =
new InMemoryPlan(rootQueueMetrics, policy, mAgent,
clusterResource, step, res, minAlloc, maxAlloc,
"dedicated", null, true, context);
}
public int[] generateData(int length, int val) {
int[] data = new int[length];
for (int i = 0; i < length; i++) {
data[i] = val;
}
return data;
} }
@Test @Test
public void testSimplePass() throws IOException, PlanningException { public void testAllocation() throws IOException, PlanningException {
// generate allocation that simply fit within all constraints runTest();
int[] f = generateData(3600, (int) Math.ceil(0.2 * totCont));
ReservationDefinition rDef =
ReservationSystemTestUtil.createSimpleReservationDefinition(
initTime, initTime + f.length + 1, f.length);
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc), false));
} }
@Test(expected = PlanningException.class) }
public void testAllocationLargerThanValidWindow() throws IOException,
PlanningException {
// generate allocation that exceed the validWindow
int[] f = generateData(25*3600, (int) Math.ceil(0.69 * totCont));
ReservationDefinition rDef =
ReservationSystemTestUtil.createSimpleReservationDefinition(
initTime, initTime + f.length + 1, f.length);
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc), false));
}
@Test
public void testSimplePass2() throws IOException, PlanningException {
// generate allocation from single tenant that exceed avg momentarily but
// fit within
// max instantanesou
int[] f = generateData(3600, (int) Math.ceil(0.69 * totCont));
ReservationDefinition rDef =
ReservationSystemTestUtil.createSimpleReservationDefinition(
initTime, initTime + f.length + 1, f.length);
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc), false));
}
@Test
public void testMultiTenantPass() throws IOException, PlanningException {
// generate allocation from multiple tenants that barely fit in tot capacity
int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
ReservationDefinition rDef =
ReservationSystemTestUtil.createSimpleReservationDefinition(
initTime, initTime + f.length + 1, f.length);
for (int i = 0; i < 4; i++) {
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc), false));
}
}
@Test(expected = PlanningQuotaException.class)
public void testMultiTenantFail() throws IOException, PlanningException {
// generate allocation from multiple tenants that exceed tot capacity
int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
ReservationDefinition rDef =
ReservationSystemTestUtil.createSimpleReservationDefinition(
initTime, initTime + f.length + 1, f.length);
for (int i = 0; i < 5; i++) {
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc), false));
}
}
@Test(expected = PlanningQuotaException.class)
public void testInstFail() throws IOException, PlanningException {
// generate allocation that exceed the instantaneous cap single-show
int[] f = generateData(3600, (int) Math.ceil(0.71 * totCont));
ReservationDefinition rDef =
ReservationSystemTestUtil.createSimpleReservationDefinition(
initTime, initTime + f.length + 1, f.length);
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc), false));
Assert.fail("should not have accepted this");
}
@Test
public void testInstFailBySum() throws IOException, PlanningException {
// generate allocation that exceed the instantaneous cap by sum
int[] f = generateData(3600, (int) Math.ceil(0.3 * totCont));
ReservationDefinition rDef =
ReservationSystemTestUtil.createSimpleReservationDefinition(
initTime, initTime + f.length + 1, f.length);
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc), false));
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc), false));
try {
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc), false));
Assert.fail();
} catch (PlanningQuotaException p) {
// expected
}
}
@Test(expected = PlanningQuotaException.class)
public void testFailAvg() throws IOException, PlanningException {
// generate an allocation which violates the 25% average single-shot
Map<ReservationInterval, Resource> req =
new TreeMap<ReservationInterval, Resource>();
long win = timeWindow / 2 + 100;
int cont = (int) Math.ceil(0.5 * totCont);
req.put(new ReservationInterval(initTime, initTime + win),
ReservationSystemUtil.toResource(
ReservationRequest.newInstance(Resource.newInstance(1024, 1),
cont)));
ReservationDefinition rDef =
ReservationSystemTestUtil.createSimpleReservationDefinition(
initTime, initTime + win, win);
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
"dedicated", initTime, initTime + win, req, res, minAlloc), false));
}
@Test
public void testFailAvgBySum() throws IOException, PlanningException {
// generate an allocation which violates the 25% average by sum
Map<ReservationInterval, Resource> req =
new TreeMap<ReservationInterval, Resource>();
long win = 86400000 / 4 + 1;
int cont = (int) Math.ceil(0.5 * totCont);
req.put(new ReservationInterval(initTime, initTime + win),
ReservationSystemUtil.toResource(ReservationRequest.newInstance(Resource
.newInstance(1024, 1), cont)));
ReservationDefinition rDef =
ReservationSystemTestUtil.createSimpleReservationDefinition(
initTime, initTime + win, win);
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
"dedicated", initTime, initTime + win, req, res, minAlloc), false));
try {
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", initTime, initTime + win, req, res, minAlloc), false));
Assert.fail("should not have accepted this");
} catch (PlanningQuotaException e) {
// expected
}
}
}

View File

@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the * to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -17,145 +17,70 @@
*******************************************************************************/ *******************************************************************************/
package org.apache.hadoop.yarn.server.resourcemanager.reservation; package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import org.apache.hadoop.yarn.api.records.ReservationDefinition; import net.jcip.annotations.NotThreadSafe;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
public class TestNoOverCommitPolicy { /**
* This clas tests {@code NoOverCommitPolicy} sharing policy.
*/
@RunWith(value = Parameterized.class)
@NotThreadSafe
@SuppressWarnings("VisibilityModifier")
public class TestNoOverCommitPolicy extends BaseSharingPolicyTest {
long step; final static long ONEHOUR = 3600 * 1000;
long initTime; final static String TWOHOURPERIOD = "7200000";
InMemoryPlan plan; @Parameterized.Parameters(name = "Duration {0}, height {1}," +
ReservationAgent mAgent; " submissions {2}, periodic {3})")
Resource minAlloc; public static Collection<Object[]> data() {
ResourceCalculator res; return Arrays.asList(new Object[][] {
Resource maxAlloc;
int totCont = 1000000; // easy fit
{ONEHOUR, 0.25, 1, null, null },
{ONEHOUR, 0.25, 1, TWOHOURPERIOD, null },
@Before // barely fit
public void setup() throws Exception { {ONEHOUR, 1, 1, null, null },
{ONEHOUR, 1, 1, TWOHOURPERIOD, null },
// 1 sec step // overcommit with single reservation
step = 1000L; {ONEHOUR, 1.1, 1, null, ResourceOverCommitException.class },
{ONEHOUR, 1.1, 1, TWOHOURPERIOD, ResourceOverCommitException.class },
initTime = System.currentTimeMillis(); // barely fit with multiple reservations
minAlloc = Resource.newInstance(1024, 1); {ONEHOUR, 0.25, 4, null, null },
res = new DefaultResourceCalculator(); {ONEHOUR, 0.25, 4, TWOHOURPERIOD, null },
maxAlloc = Resource.newInstance(1024 * 8, 8);
mAgent = mock(ReservationAgent.class); // overcommit with multiple reservations
{ONEHOUR, 0.25, 5, null, ResourceOverCommitException.class },
{ONEHOUR, 0.25, 5, TWOHOURPERIOD, ResourceOverCommitException.class }
});
}
@Override
public SharingPolicy getInitializedPolicy() {
String reservationQ = String reservationQ =
ReservationSystemTestUtil.getFullReservationQueueName(); ReservationSystemTestUtil.getFullReservationQueueName();
QueueMetrics rootQueueMetrics = mock(QueueMetrics.class); conf = new CapacitySchedulerConfiguration();
Resource clusterResource = SharingPolicy policy = new NoOverCommitPolicy();
ReservationSystemTestUtil.calculateClusterResource(totCont);
ReservationSchedulerConfiguration conf = mock
(ReservationSchedulerConfiguration.class);
NoOverCommitPolicy policy = new NoOverCommitPolicy();
policy.init(reservationQ, conf); policy.init(reservationQ, conf);
RMContext context = ReservationSystemTestUtil.createMockRMContext(); return policy;
plan =
new InMemoryPlan(rootQueueMetrics, policy, mAgent,
clusterResource, step, res, minAlloc, maxAlloc,
"dedicated", null, true, context);
}
public int[] generateData(int length, int val) {
int[] data = new int[length];
for (int i = 0; i < length; i++) {
data[i] = val;
}
return data;
} }
@Test @Test
public void testSingleUserEasyFitPass() throws IOException, PlanningException { public void testAllocation() throws IOException, PlanningException {
// generate allocation that easily fit within resource constraints runTest();
int[] f = generateData(3600, (int) Math.ceil(0.2 * totCont));
ReservationDefinition rDef =
ReservationSystemTestUtil.createSimpleReservationDefinition(
initTime, initTime + f.length + 1, f.length);
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc), false));
} }
@Test }
public void testSingleUserBarelyFitPass() throws IOException,
PlanningException {
// generate allocation from single tenant that barely fit
int[] f = generateData(3600, totCont);
ReservationDefinition rDef =
ReservationSystemTestUtil.createSimpleReservationDefinition(
initTime, initTime + f.length + 1, f.length);
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), rDef, "u1",
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc), false));
}
@Test(expected = ResourceOverCommitException.class)
public void testSingleFail() throws IOException, PlanningException {
// generate allocation from single tenant that exceed capacity
int[] f = generateData(3600, (int) (1.1 * totCont));
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", initTime, initTime + f.length, ReservationSystemTestUtil
.generateAllocation(initTime, step, f), res, minAlloc), false);
}
@Test
public void testMultiTenantPass() throws IOException, PlanningException {
// generate allocation from multiple tenants that barely fit in tot capacity
int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
ReservationDefinition rDef =
ReservationSystemTestUtil.createSimpleReservationDefinition(
initTime, initTime + f.length + 1, f.length);
for (int i = 0; i < 4; i++) {
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc), false));
}
}
@Test(expected = ResourceOverCommitException.class)
public void testMultiTenantFail() throws IOException, PlanningException {
// generate allocation from multiple tenants that exceed tot capacity
int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
ReservationDefinition rDef =
ReservationSystemTestUtil.createSimpleReservationDefinition(
initTime, initTime + f.length + 1, f.length);
for (int i = 0; i < 5; i++) {
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i,
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc), false));
}
}
}