diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java index bb1a4e82789..acd577475ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java @@ -95,26 +95,29 @@ public class CapacityOverTimePolicy extends NoOverCommitPolicy { throw new PlanningQuotaException(p); } + long checkStart = reservation.getStartTime() - validWindow; + long checkEnd = reservation.getEndTime() + validWindow; + //---- check for integral violations of capacity -------- // Gather a view of what to check (curr allocation of user, minus old // version of this reservation, plus new version) RLESparseResourceAllocation consumptionForUserOverTime = plan.getConsumptionForUserOverTime(reservation.getUser(), - reservation.getStartTime() - validWindow, - reservation.getEndTime() + validWindow); + checkStart, checkEnd); ReservationAllocation old = plan.getReservationById(reservation.getReservationId()); if (old != null) { - consumptionForUserOverTime = RLESparseResourceAllocation - .merge(plan.getResourceCalculator(), plan.getTotalCapacity(), - consumptionForUserOverTime, old.getResourcesOverTime(), - RLEOperator.add, reservation.getStartTime() - validWindow, - reservation.getEndTime() + validWindow); + consumptionForUserOverTime = + RLESparseResourceAllocation.merge(plan.getResourceCalculator(), + plan.getTotalCapacity(), consumptionForUserOverTime, + old.getResourcesOverTime(checkStart, checkEnd), RLEOperator.add, + checkStart, checkEnd); } - RLESparseResourceAllocation resRLE = reservation.getResourcesOverTime(); + RLESparseResourceAllocation resRLE = + reservation.getResourcesOverTime(checkStart, checkEnd); RLESparseResourceAllocation toCheck = RLESparseResourceAllocation .merge(plan.getResourceCalculator(), plan.getTotalCapacity(), @@ -191,11 +194,11 @@ public class CapacityOverTimePolicy extends NoOverCommitPolicy { // compare using merge() limit with integral try { - RLESparseResourceAllocation - .merge(plan.getResourceCalculator(), plan.getTotalCapacity(), - targetLimit, integral, RLEOperator.subtractTestNonNegative, - reservation.getStartTime() - validWindow, - reservation.getEndTime() + validWindow); + + RLESparseResourceAllocation.merge(plan.getResourceCalculator(), + plan.getTotalCapacity(), targetLimit, integral, + RLEOperator.subtractTestNonNegative, checkStart, checkEnd); + } catch (PlanningException p) { throw new PlanningQuotaException( "Integral (avg over time) quota capacity " + maxAvg @@ -240,7 +243,8 @@ public class CapacityOverTimePolicy extends NoOverCommitPolicy { if (old != null) { used = RLESparseResourceAllocation.merge(plan.getResourceCalculator(), Resources.clone(plan.getTotalCapacity()), used, - old.getResourcesOverTime(), RLEOperator.subtract, start, end); + old.getResourcesOverTime(start, end), RLEOperator.subtract, start, + end); } instRLEQuota = RLESparseResourceAllocation diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java index 49d470211d4..98ef5828760 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java @@ -40,13 +40,17 @@ public class NoOverCommitPolicy implements SharingPolicy { RLESparseResourceAllocation available = plan.getAvailableResourceOverTime( reservation.getUser(), reservation.getReservationId(), - reservation.getStartTime(), reservation.getEndTime(), 0); + reservation.getStartTime(), reservation.getEndTime(), + reservation.getPeriodicity()); // test the reservation does not exceed what is available try { + + RLESparseResourceAllocation ask = reservation.getResourcesOverTime( + reservation.getStartTime(), reservation.getEndTime()); RLESparseResourceAllocation .merge(plan.getResourceCalculator(), plan.getTotalCapacity(), - available, reservation.getResourcesOverTime(), + available, ask, RLESparseResourceAllocation.RLEOperator.subtractTestNonNegative, reservation.getStartTime(), reservation.getEndTime()); } catch (PlanningException p) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/BaseSharingPolicyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/BaseSharingPolicyTest.java new file mode 100644 index 00000000000..294564a2d0c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/BaseSharingPolicyTest.java @@ -0,0 +1,189 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import static junit.framework.TestCase.fail; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Before; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import net.jcip.annotations.NotThreadSafe; + +/** + * This class is a base test for {@code SharingPolicy} implementors. + */ +@RunWith(value = Parameterized.class) +@NotThreadSafe +@SuppressWarnings("VisibilityModifier") +public abstract class BaseSharingPolicyTest { + + @Parameterized.Parameter(value = 0) + public long duration; + + @Parameterized.Parameter(value = 1) + public double height; + + @Parameterized.Parameter(value = 2) + public int numSubmissions; + + @Parameterized.Parameter(value = 3) + public String recurrenceExpression; + + @Parameterized.Parameter(value = 4) + public Class expectedError; + + private long step; + private long initTime; + + private InMemoryPlan plan; + private ReservationAgent mAgent; + private Resource minAlloc; + private ResourceCalculator res; + private Resource maxAlloc; + + private int totCont = 1000; + + protected ReservationSchedulerConfiguration conf; + + @Before + public void setup() { + // 1 sec step + step = 1000L; + initTime = System.currentTimeMillis(); + + minAlloc = Resource.newInstance(1024, 1); + res = new DefaultResourceCalculator(); + maxAlloc = Resource.newInstance(1024 * 8, 8); + + mAgent = mock(ReservationAgent.class); + + QueueMetrics rootQueueMetrics = mock(QueueMetrics.class); + Resource clusterResource = + ReservationSystemTestUtil.calculateClusterResource(totCont); + + // invoke implementors initialization of policy + SharingPolicy policy = getInitializedPolicy(); + + RMContext context = ReservationSystemTestUtil.createMockRMContext(); + + plan = new InMemoryPlan(rootQueueMetrics, policy, mAgent, clusterResource, + step, res, minAlloc, maxAlloc, "dedicated", null, true, context); + } + + public void runTest() throws IOException, PlanningException { + + long period = 1; + if (recurrenceExpression != null) { + period = Long.parseLong(recurrenceExpression); + } + + try { + RLESparseResourceAllocation rle = generateRLEAlloc(period); + + // Generate the intervalMap (trimming out-of-period entries) + Map reservationIntervalResourceMap; + if (period > 1) { + rle = new PeriodicRLESparseResourceAllocation(rle, period); + reservationIntervalResourceMap = + ReservationSystemTestUtil.toAllocation(rle, 0, period); + } else { + reservationIntervalResourceMap = ReservationSystemTestUtil + .toAllocation(rle, Long.MIN_VALUE, Long.MAX_VALUE); + } + + ReservationDefinition rDef = + ReservationSystemTestUtil.createSimpleReservationDefinition( + initTime % period, initTime % period + duration + 1, duration, 1, + recurrenceExpression); + + // perform multiple submissions where required + for (int i = 0; i < numSubmissions; i++) { + + long rstart = rle.getEarliestStartTime(); + long rend = rle.getLatestNonNullTime(); + + InMemoryReservationAllocation resAlloc = + new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", + "dedicated", rstart, rend, reservationIntervalResourceMap, res, + minAlloc); + + assertTrue(plan.toString(), plan.addReservation(resAlloc, false)); + } + // fail if error was expected + if (expectedError != null) { + System.out.println(plan.toString()); + fail(); + } + } catch (Exception e) { + if (expectedError == null || !e.getClass().getCanonicalName() + .equals(expectedError.getCanonicalName())) { + // fail on unexpected errors + throw e; + } + } + } + + private RLESparseResourceAllocation generateRLEAlloc(long period) { + RLESparseResourceAllocation rle = + new RLESparseResourceAllocation(new DefaultResourceCalculator()); + + Resource alloc = Resources.multiply(minAlloc, height * totCont); + + // loop in case the periodicity of the reservation is smaller than LCM + long rStart = initTime % period; + long rEnd = initTime % period + duration; + + + // handle wrap-around + if (period > 1 && rEnd > period) { + long diff = rEnd - period; + rEnd = period; + + // handle multiple wrap-arounds (e.g., 5h duration on a 2h periodicity) + if(duration > period) { + rle.addInterval(new ReservationInterval(0, period), + Resources.multiply(alloc, duration / period - 1)); + rle.addInterval(new ReservationInterval(0, diff % period), alloc); + } else { + rle.addInterval(new ReservationInterval(0, diff), alloc); + } + } + + + rle.addInterval(new ReservationInterval(rStart, rEnd), alloc); + return rle; + } + + public abstract SharingPolicy getInitializedPolicy(); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java index 5337e061be9..eef86a44990 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/****************************************************************************** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -14,7 +14,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - *******************************************************************************/ + *****************************************************************************/ package org.apache.hadoop.yarn.server.resourcemanager.reservation; import static org.mockito.Matchers.any; @@ -466,4 +466,28 @@ public class ReservationSystemTestUtil { public static Resource calculateClusterResource(int numContainers) { return Resource.newInstance(numContainers * 1024, numContainers); } + + + public static Map toAllocation( + RLESparseResourceAllocation rle, long start, long end) { + Map resAlloc = new TreeMap<>(); + + for (Map.Entry e : rle.getCumulative().entrySet()) { + Long nextKey = rle.getCumulative().higherKey(e.getKey()); + if (nextKey == null) { + break; + } else { + if (e.getKey() >= start && e.getKey() <= end && nextKey >= start + && nextKey <= end) { + resAlloc.put(new ReservationInterval(e.getKey(), nextKey), + e.getValue()); + } + } + } + + return resAlloc; + } + + + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java index 2dee60c2f65..d054d3a7e41 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,269 +17,118 @@ *******************************************************************************/ package org.apache.hadoop.yarn.server.resourcemanager.reservation; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; - import java.io.IOException; -import java.util.Map; -import java.util.TreeMap; +import java.util.Arrays; +import java.util.Collection; -import org.apache.hadoop.yarn.api.records.ReservationDefinition; -import org.apache.hadoop.yarn.api.records.ReservationRequest; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import net.jcip.annotations.NotThreadSafe; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException; -import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; -import org.apache.hadoop.yarn.util.resource.ResourceCalculator; -import org.junit.Assert; -import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; -public class TestCapacityOverTimePolicy { +/** + * This class tests the {@code CapacityOvertimePolicy} sharing policy. + */ +@RunWith(value = Parameterized.class) +@NotThreadSafe +@SuppressWarnings("VisibilityModifier") +public class TestCapacityOverTimePolicy extends BaseSharingPolicyTest { - long timeWindow; - long step; - float avgConstraint; - float instConstraint; - long initTime; + final static long ONEDAY = 86400 * 1000; + final static long ONEHOUR = 3600 * 1000; + final static long ONEMINUTE = 60 * 1000; + final static String TWODAYPERIOD = "7200000"; + final static String ONEDAYPERIOD = "86400000"; - InMemoryPlan plan; - ReservationAgent mAgent; - Resource minAlloc; - ResourceCalculator res; - Resource maxAlloc; + @Parameterized.Parameters(name = "Duration {0}, height {1}," + + " submission {2}, periodic {3})") + public static Collection data() { + return Arrays.asList(new Object[][] { - int totCont = 1000000; + // easy fit + {ONEHOUR, 0.25, 1, null, null }, + {ONEHOUR, 0.25, 1, TWODAYPERIOD, null }, + {ONEHOUR, 0.25, 1, ONEDAYPERIOD, null }, - @Before - public void setup() throws Exception { + // instantaneous high, but fit integral and inst limits + {ONEMINUTE, 0.74, 1, null, null }, + {ONEMINUTE, 0.74, 1, TWODAYPERIOD, null }, + {ONEMINUTE, 0.74, 1, ONEDAYPERIOD, null }, + + // barely fit + {ONEHOUR, 0.76, 1, null, PlanningQuotaException.class }, + {ONEHOUR, 0.76, 1, TWODAYPERIOD, PlanningQuotaException.class }, + {ONEHOUR, 0.76, 1, ONEDAYPERIOD, PlanningQuotaException.class }, + + // overcommit with single reservation + {ONEHOUR, 1.1, 1, null, PlanningQuotaException.class }, + {ONEHOUR, 1.1, 1, TWODAYPERIOD, PlanningQuotaException.class }, + {ONEHOUR, 1.1, 1, ONEDAYPERIOD, PlanningQuotaException.class }, + + // barely fit with multiple reservations (instantaneously, lowering to + // 1min to fit integral) + {ONEMINUTE, 0.25, 3, null, null }, + {ONEMINUTE, 0.25, 3, TWODAYPERIOD, null }, + {ONEMINUTE, 0.25, 3, ONEDAYPERIOD, null }, + + // overcommit with multiple reservations (instantaneously) + {ONEMINUTE, 0.25, 4, null, PlanningQuotaException.class }, + {ONEMINUTE, 0.25, 4, TWODAYPERIOD, PlanningQuotaException.class }, + {ONEMINUTE, 0.25, 4, ONEDAYPERIOD, PlanningQuotaException.class }, + + // (non-periodic) reservation longer than window + {25 * ONEHOUR, 0.25, 1, null, PlanningQuotaException.class }, + {25 * ONEHOUR, 0.25, 1, TWODAYPERIOD, PlanningQuotaException.class }, + {25 * ONEHOUR, 0.25, 1, ONEDAYPERIOD, PlanningQuotaException.class }, + + // (non-periodic) reservation longer than window + {25 * ONEHOUR, 0.05, 5, null, PlanningQuotaException.class }, + {25 * ONEHOUR, 0.05, 5, TWODAYPERIOD, PlanningQuotaException.class }, + {25 * ONEHOUR, 0.05, 5, ONEDAYPERIOD, PlanningQuotaException.class }, + + // overcommit integral + {ONEDAY, 0.26, 1, null, PlanningQuotaException.class }, + {2 * ONEHOUR, 0.26, 1, TWODAYPERIOD, PlanningQuotaException.class }, + {2 * ONEDAY, 0.26, 1, ONEDAYPERIOD, PlanningQuotaException.class }, + + // overcommit integral + {ONEDAY / 2, 0.51, 1, null, PlanningQuotaException.class }, + {2 * ONEHOUR / 2, 0.51, 1, TWODAYPERIOD, + PlanningQuotaException.class }, + {2 * ONEDAY / 2, 0.51, 1, ONEDAYPERIOD, PlanningQuotaException.class } + + }); + } + + @Override + public SharingPolicy getInitializedPolicy() { // 24h window - timeWindow = 86400000L; + long timeWindow = 86400000L; + // 1 sec step - step = 1000L; + long step = 1000L; // 25% avg cap on capacity - avgConstraint = 25; + float avgConstraint = 25; // 70% instantaneous cap on capacity - instConstraint = 70; + float instConstraint = 75; - initTime = System.currentTimeMillis(); - minAlloc = Resource.newInstance(1024, 1); - res = new DefaultResourceCalculator(); - maxAlloc = Resource.newInstance(1024 * 8, 8); - - mAgent = mock(ReservationAgent.class); - QueueMetrics rootQueueMetrics = mock(QueueMetrics.class); String reservationQ = ReservationSystemTestUtil.getFullReservationQueueName(); - Resource clusterResource = - ReservationSystemTestUtil.calculateClusterResource(totCont); - ReservationSchedulerConfiguration conf = - ReservationSystemTestUtil.createConf(reservationQ, timeWindow, - instConstraint, avgConstraint); + conf = ReservationSystemTestUtil.createConf(reservationQ, timeWindow, + instConstraint, avgConstraint); CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); policy.init(reservationQ, conf); - RMContext context = ReservationSystemTestUtil.createMockRMContext(); - - plan = - new InMemoryPlan(rootQueueMetrics, policy, mAgent, - clusterResource, step, res, minAlloc, maxAlloc, - "dedicated", null, true, context); - } - - public int[] generateData(int length, int val) { - int[] data = new int[length]; - for (int i = 0; i < length; i++) { - data[i] = val; - } - return data; + return policy; } @Test - public void testSimplePass() throws IOException, PlanningException { - // generate allocation that simply fit within all constraints - int[] f = generateData(3600, (int) Math.ceil(0.2 * totCont)); - - ReservationDefinition rDef = - ReservationSystemTestUtil.createSimpleReservationDefinition( - initTime, initTime + f.length + 1, f.length); - assertTrue(plan.toString(), - plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", - "dedicated", initTime, initTime + f.length, - ReservationSystemTestUtil.generateAllocation(initTime, step, f), - res, minAlloc), false)); + public void testAllocation() throws IOException, PlanningException { + runTest(); } - @Test(expected = PlanningException.class) - public void testAllocationLargerThanValidWindow() throws IOException, - PlanningException { - // generate allocation that exceed the validWindow - int[] f = generateData(25*3600, (int) Math.ceil(0.69 * totCont)); - - ReservationDefinition rDef = - ReservationSystemTestUtil.createSimpleReservationDefinition( - initTime, initTime + f.length + 1, f.length); - assertTrue(plan.toString(), - plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", - "dedicated", initTime, initTime + f.length, - ReservationSystemTestUtil.generateAllocation(initTime, step, f), - res, minAlloc), false)); - } - - @Test - public void testSimplePass2() throws IOException, PlanningException { - // generate allocation from single tenant that exceed avg momentarily but - // fit within - // max instantanesou - int[] f = generateData(3600, (int) Math.ceil(0.69 * totCont)); - ReservationDefinition rDef = - ReservationSystemTestUtil.createSimpleReservationDefinition( - initTime, initTime + f.length + 1, f.length); - assertTrue(plan.toString(), - plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", - "dedicated", initTime, initTime + f.length, - ReservationSystemTestUtil.generateAllocation(initTime, step, f), - res, minAlloc), false)); - } - - @Test - public void testMultiTenantPass() throws IOException, PlanningException { - // generate allocation from multiple tenants that barely fit in tot capacity - int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont)); - ReservationDefinition rDef = - ReservationSystemTestUtil.createSimpleReservationDefinition( - initTime, initTime + f.length + 1, f.length); - for (int i = 0; i < 4; i++) { - assertTrue(plan.toString(), - plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i, - "dedicated", initTime, initTime + f.length, - ReservationSystemTestUtil.generateAllocation(initTime, step, f), - res, minAlloc), false)); - } - } - - @Test(expected = PlanningQuotaException.class) - public void testMultiTenantFail() throws IOException, PlanningException { - // generate allocation from multiple tenants that exceed tot capacity - int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont)); - ReservationDefinition rDef = - ReservationSystemTestUtil.createSimpleReservationDefinition( - initTime, initTime + f.length + 1, f.length); - for (int i = 0; i < 5; i++) { - assertTrue(plan.toString(), - plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i, - "dedicated", initTime, initTime + f.length, - ReservationSystemTestUtil.generateAllocation(initTime, step, f), - res, minAlloc), false)); - } - } - - @Test(expected = PlanningQuotaException.class) - public void testInstFail() throws IOException, PlanningException { - // generate allocation that exceed the instantaneous cap single-show - int[] f = generateData(3600, (int) Math.ceil(0.71 * totCont)); - ReservationDefinition rDef = - ReservationSystemTestUtil.createSimpleReservationDefinition( - initTime, initTime + f.length + 1, f.length); - assertTrue(plan.toString(), - plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", - "dedicated", initTime, initTime + f.length, - ReservationSystemTestUtil.generateAllocation(initTime, step, f), - res, minAlloc), false)); - Assert.fail("should not have accepted this"); - } - - @Test - public void testInstFailBySum() throws IOException, PlanningException { - // generate allocation that exceed the instantaneous cap by sum - int[] f = generateData(3600, (int) Math.ceil(0.3 * totCont)); - ReservationDefinition rDef = - ReservationSystemTestUtil.createSimpleReservationDefinition( - initTime, initTime + f.length + 1, f.length); - assertTrue(plan.toString(), - plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", - "dedicated", initTime, initTime + f.length, - ReservationSystemTestUtil.generateAllocation(initTime, step, f), - res, minAlloc), false)); - assertTrue(plan.toString(), - plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", - "dedicated", initTime, initTime + f.length, - ReservationSystemTestUtil.generateAllocation(initTime, step, f), - res, minAlloc), false)); - try { - assertTrue(plan.toString(), - plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", - "dedicated", initTime, initTime + f.length, - ReservationSystemTestUtil.generateAllocation(initTime, step, f), - res, minAlloc), false)); - Assert.fail(); - } catch (PlanningQuotaException p) { - // expected - } - } - - @Test(expected = PlanningQuotaException.class) - public void testFailAvg() throws IOException, PlanningException { - // generate an allocation which violates the 25% average single-shot - Map req = - new TreeMap(); - long win = timeWindow / 2 + 100; - int cont = (int) Math.ceil(0.5 * totCont); - req.put(new ReservationInterval(initTime, initTime + win), - ReservationSystemUtil.toResource( - ReservationRequest.newInstance(Resource.newInstance(1024, 1), - cont))); - ReservationDefinition rDef = - ReservationSystemTestUtil.createSimpleReservationDefinition( - initTime, initTime + win, win); - assertTrue(plan.toString(), - plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", - "dedicated", initTime, initTime + win, req, res, minAlloc), false)); - } - - @Test - public void testFailAvgBySum() throws IOException, PlanningException { - // generate an allocation which violates the 25% average by sum - Map req = - new TreeMap(); - long win = 86400000 / 4 + 1; - int cont = (int) Math.ceil(0.5 * totCont); - req.put(new ReservationInterval(initTime, initTime + win), - ReservationSystemUtil.toResource(ReservationRequest.newInstance(Resource - .newInstance(1024, 1), cont))); - ReservationDefinition rDef = - ReservationSystemTestUtil.createSimpleReservationDefinition( - initTime, initTime + win, win); - assertTrue(plan.toString(), - plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", - "dedicated", initTime, initTime + win, req, res, minAlloc), false)); - - try { - assertTrue(plan.toString(), - plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), null, "u1", - "dedicated", initTime, initTime + win, req, res, minAlloc), false)); - - Assert.fail("should not have accepted this"); - } catch (PlanningQuotaException e) { - // expected - } - } - -} +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java index c5edaf000e8..accdf24a02a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,145 +17,70 @@ *******************************************************************************/ package org.apache.hadoop.yarn.server.resourcemanager.reservation; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; - import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; -import org.apache.hadoop.yarn.api.records.ReservationDefinition; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import net.jcip.annotations.NotThreadSafe; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException; -import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; -import org.apache.hadoop.yarn.util.resource.ResourceCalculator; -import org.junit.Before; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; -public class TestNoOverCommitPolicy { +/** + * This clas tests {@code NoOverCommitPolicy} sharing policy. + */ +@RunWith(value = Parameterized.class) +@NotThreadSafe +@SuppressWarnings("VisibilityModifier") +public class TestNoOverCommitPolicy extends BaseSharingPolicyTest { - long step; - long initTime; + final static long ONEHOUR = 3600 * 1000; + final static String TWOHOURPERIOD = "7200000"; - InMemoryPlan plan; - ReservationAgent mAgent; - Resource minAlloc; - ResourceCalculator res; - Resource maxAlloc; + @Parameterized.Parameters(name = "Duration {0}, height {1}," + + " submissions {2}, periodic {3})") + public static Collection data() { + return Arrays.asList(new Object[][] { - int totCont = 1000000; + // easy fit + {ONEHOUR, 0.25, 1, null, null }, + {ONEHOUR, 0.25, 1, TWOHOURPERIOD, null }, - @Before - public void setup() throws Exception { + // barely fit + {ONEHOUR, 1, 1, null, null }, + {ONEHOUR, 1, 1, TWOHOURPERIOD, null }, - // 1 sec step - step = 1000L; + // overcommit with single reservation + {ONEHOUR, 1.1, 1, null, ResourceOverCommitException.class }, + {ONEHOUR, 1.1, 1, TWOHOURPERIOD, ResourceOverCommitException.class }, - initTime = System.currentTimeMillis(); - minAlloc = Resource.newInstance(1024, 1); - res = new DefaultResourceCalculator(); - maxAlloc = Resource.newInstance(1024 * 8, 8); + // barely fit with multiple reservations + {ONEHOUR, 0.25, 4, null, null }, + {ONEHOUR, 0.25, 4, TWOHOURPERIOD, null }, - mAgent = mock(ReservationAgent.class); + // overcommit with multiple reservations + {ONEHOUR, 0.25, 5, null, ResourceOverCommitException.class }, + {ONEHOUR, 0.25, 5, TWOHOURPERIOD, ResourceOverCommitException.class } + + }); + } + + @Override + public SharingPolicy getInitializedPolicy() { String reservationQ = ReservationSystemTestUtil.getFullReservationQueueName(); - QueueMetrics rootQueueMetrics = mock(QueueMetrics.class); - Resource clusterResource = - ReservationSystemTestUtil.calculateClusterResource(totCont); - ReservationSchedulerConfiguration conf = mock - (ReservationSchedulerConfiguration.class); - NoOverCommitPolicy policy = new NoOverCommitPolicy(); + conf = new CapacitySchedulerConfiguration(); + SharingPolicy policy = new NoOverCommitPolicy(); policy.init(reservationQ, conf); - RMContext context = ReservationSystemTestUtil.createMockRMContext(); - - plan = - new InMemoryPlan(rootQueueMetrics, policy, mAgent, - clusterResource, step, res, minAlloc, maxAlloc, - "dedicated", null, true, context); - } - - public int[] generateData(int length, int val) { - int[] data = new int[length]; - for (int i = 0; i < length; i++) { - data[i] = val; - } - return data; + return policy; } @Test - public void testSingleUserEasyFitPass() throws IOException, PlanningException { - // generate allocation that easily fit within resource constraints - int[] f = generateData(3600, (int) Math.ceil(0.2 * totCont)); - ReservationDefinition rDef = - ReservationSystemTestUtil.createSimpleReservationDefinition( - initTime, initTime + f.length + 1, f.length); - assertTrue(plan.toString(), - plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", - "dedicated", initTime, initTime + f.length, - ReservationSystemTestUtil.generateAllocation(initTime, step, f), - res, minAlloc), false)); + public void testAllocation() throws IOException, PlanningException { + runTest(); } - @Test - public void testSingleUserBarelyFitPass() throws IOException, - PlanningException { - // generate allocation from single tenant that barely fit - int[] f = generateData(3600, totCont); - ReservationDefinition rDef = - ReservationSystemTestUtil.createSimpleReservationDefinition( - initTime, initTime + f.length + 1, f.length); - assertTrue(plan.toString(), - plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", - "dedicated", initTime, initTime + f.length, - ReservationSystemTestUtil.generateAllocation(initTime, step, f), - res, minAlloc), false)); - } - - @Test(expected = ResourceOverCommitException.class) - public void testSingleFail() throws IOException, PlanningException { - // generate allocation from single tenant that exceed capacity - int[] f = generateData(3600, (int) (1.1 * totCont)); - plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), null, "u1", - "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil - .generateAllocation(initTime, step, f), res, minAlloc), false); - } - - @Test - public void testMultiTenantPass() throws IOException, PlanningException { - // generate allocation from multiple tenants that barely fit in tot capacity - int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont)); - ReservationDefinition rDef = - ReservationSystemTestUtil.createSimpleReservationDefinition( - initTime, initTime + f.length + 1, f.length); - for (int i = 0; i < 4; i++) { - assertTrue(plan.toString(), - plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i, - "dedicated", initTime, initTime + f.length, - ReservationSystemTestUtil.generateAllocation(initTime, step, f), - res, minAlloc), false)); - } - } - - @Test(expected = ResourceOverCommitException.class) - public void testMultiTenantFail() throws IOException, PlanningException { - // generate allocation from multiple tenants that exceed tot capacity - int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont)); - ReservationDefinition rDef = - ReservationSystemTestUtil.createSimpleReservationDefinition( - initTime, initTime + f.length + 1, f.length); - for (int i = 0; i < 5; i++) { - assertTrue(plan.toString(), - plan.addReservation(new InMemoryReservationAllocation( - ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i, - "dedicated", initTime, initTime + f.length, - ReservationSystemTestUtil.generateAllocation(initTime, step, f), - res, minAlloc), false)); - } - } -} +} \ No newline at end of file