YARN-5331. Extend RLESparseResourceAllocation with period for supporting recurring reservations in YARN ReservationSystem. (Sangeetha Abdu Jyothi via Subru).

(cherry picked from commit 6bf42e48ef)
This commit is contained in:
Subru Krishnan 2017-05-01 18:48:36 -07:00
parent 2030f408e0
commit 6d53224afc
5 changed files with 429 additions and 1 deletions

View File

@ -0,0 +1,167 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This data structure stores a periodic RLESparseResourceAllocation.
* Default period is 1 day (86400000ms).
*/
public class PeriodicRLESparseResourceAllocation extends
RLESparseResourceAllocation {
// Log
private static final Logger LOG = LoggerFactory
.getLogger(PeriodicRLESparseResourceAllocation.class);
private long timePeriod;
/**
* Constructor.
*
* @param rleVector {@link RLESparseResourceAllocation} with the run-length
encoded data.
* @param timePeriod Time period in milliseconds.
*/
public PeriodicRLESparseResourceAllocation(
RLESparseResourceAllocation rleVector, Long timePeriod) {
super(rleVector.getCumulative(), rleVector.getResourceCalculator());
this.timePeriod = timePeriod;
}
/**
* Constructor. Default time period set to 1 day.
*
* @param rleVector {@link RLESparseResourceAllocation} with the run-length
encoded data.
*/
public PeriodicRLESparseResourceAllocation(
RLESparseResourceAllocation rleVector) {
this(rleVector, 86400000L);
}
/**
* Get capacity at time based on periodic repetition.
*
* @param tick UTC time for which the allocated {@link Resource} is queried.
* @return {@link Resource} allocated at specified time
*/
public Resource getCapacityAtTime(long tick) {
long convertedTime = (tick % timePeriod);
return super.getCapacityAtTime(convertedTime);
}
/**
* Add resource for the specified interval. This function will be used by
* {@link InMemoryPlan} while placing reservations between 0 and timePeriod.
* The interval may include 0, but the end time must be strictly less than
* timePeriod.
*
* @param interval {@link ReservationInterval} to which the specified
* resource is to be added.
* @param resource {@link Resource} to be added to the interval specified.
* @return true if addition is successful, false otherwise
*/
public boolean addInterval(ReservationInterval interval,
Resource resource) {
long startTime = interval.getStartTime();
long endTime = interval.getEndTime();
if (startTime >= 0 && endTime > startTime && endTime <= timePeriod) {
return super.addInterval(interval, resource);
} else {
LOG.info("Cannot set capacity beyond end time: " + timePeriod);
return false;
}
}
/**
* Removes a resource for the specified interval.
*
* @param interval the {@link ReservationInterval} for which the resource is
* to be removed.
* @param resource the {@link Resource} to be removed.
* @return true if removal is successful, false otherwise
*/
public boolean removeInterval(
ReservationInterval interval, Resource resource) {
long startTime = interval.getStartTime();
long endTime = interval.getEndTime();
// If the resource to be subtracted is less than the minimum resource in
// the range, abort removal to avoid negative capacity.
if (!Resources.fitsIn(
resource, super.getMinimumCapacityInInterval(interval))) {
LOG.info("Request to remove more resources than what is available");
return false;
}
if (startTime >= 0 && endTime > startTime && endTime <= timePeriod) {
return super.removeInterval(interval, resource);
} else {
LOG.info("Interval extends beyond the end time " + timePeriod);
return false;
}
}
/**
* Get maximum capacity at periodic offsets from the specified time.
*
* @param tick UTC time base from which offsets are specified for finding
* the maximum capacity.
* @param period periodic offset at which capacities are evaluted.
* @return the maximum {@link Resource} across the specified time instants.
* @return true if removal is successful, false otherwise
*/
public Resource getMaximumPeriodicCapacity(long tick, long period) {
Resource maxResource;
if (period < timePeriod) {
maxResource =
super.getMaximumPeriodicCapacity(tick % timePeriod, period);
} else {
// if period is greater than the length of PeriodicRLESparseAllocation,
// only a single value exists in this interval.
maxResource = super.getCapacityAtTime(tick % timePeriod);
}
return maxResource;
}
/**
* Get time period of PeriodicRLESparseResourceAllocation.
*
* @return timePeriod time period represented in ms.
*/
public long getTimePeriod() {
return this.timePeriod;
}
@Override
public String toString() {
StringBuilder ret = new StringBuilder();
ret.append("Period: ").append(timePeriod).append("\n")
.append(super.toString());
if (super.isEmpty()) {
ret.append(" no allocations\n");
}
return ret.toString();
}
}

View File

@ -132,6 +132,7 @@ public class RLESparseResourceAllocation {
* Returns the capacity, i.e. total resources allocated at the specified point
* of time.
*
* @param tick timeStap at which resource needs to be known
* @return the resources allocated at the specified time
*/
public Resource getCapacityAtTime(long tick) {
@ -309,6 +310,10 @@ public class RLESparseResourceAllocation {
}
}
public ResourceCalculator getResourceCalculator() {
return resourceCalculator;
}
/**
* Merges the range start to end of two {@code RLESparseResourceAllocation}
* using a given {@code RLEOperator}.
@ -533,4 +538,50 @@ public class RLESparseResourceAllocation {
add, subtract, min, max, subtractTestNonNegative
}
/**
* Get the maximum capacity across specified time instances. The search-space
* is specified using the starting value, tick, and the periodic interval for
* search. Maximum resource allocation across tick, tick + period,
* tick + 2 * period,..., tick + n * period .. is returned.
*
* @param tick the starting time instance
* @param period interval at which capacity is evaluated
* @return maximum resource allocation
*/
public Resource getMaximumPeriodicCapacity(long tick, long period) {
Resource maxCapacity = ZERO_RESOURCE;
if (!cumulativeCapacity.isEmpty()) {
Long lastKey = cumulativeCapacity.lastKey();
for (long t = tick; t <= lastKey; t = t + period) {
maxCapacity = Resources.componentwiseMax(maxCapacity,
cumulativeCapacity.floorEntry(t).getValue());
}
}
return maxCapacity;
}
/**
* Get the minimum capacity in the specified time range.
*
* @param interval the {@link ReservationInterval} to be searched
* @return minimum resource allocation
*/
public Resource getMinimumCapacityInInterval(ReservationInterval interval) {
Resource minCapacity = Resource.newInstance(
Integer.MAX_VALUE, Integer.MAX_VALUE);
long start = interval.getStartTime();
long end = interval.getEndTime();
NavigableMap<Long, Resource> capacityRange =
this.getRangeOverlapping(start, end).getCumulative();
if (!capacityRange.isEmpty()) {
for (Map.Entry<Long, Resource> entry : capacityRange.entrySet()) {
if (entry.getValue() != null) {
minCapacity = Resources.componentwiseMin(minCapacity,
entry.getValue());
}
}
}
return minCapacity;
}
}

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedule
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.junit.Assert;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
@ -414,6 +415,19 @@ public class ReservationSystemTestUtil {
return req;
}
public static RLESparseResourceAllocation
generateRLESparseResourceAllocation(int[] alloc, long[] timeSteps) {
TreeMap<Long, Resource> allocationsMap = new TreeMap<>();
for (int i = 0; i < alloc.length; i++) {
allocationsMap.put(timeSteps[i],
Resource.newInstance(alloc[i], alloc[i]));
}
RLESparseResourceAllocation rleVector =
new RLESparseResourceAllocation(allocationsMap,
new DefaultResourceCalculator());
return rleVector;
}
public static Resource calculateClusterResource(int numContainers) {
return Resource.newInstance(numContainers * 1024, numContainers);
}

View File

@ -0,0 +1,142 @@
/******************************************************************************
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.yarn.api.records.Resource;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Testing the class PeriodicRLESparseResourceAllocation.
*/
public class TestPeriodicRLESparseResourceAllocation {
private static final Logger LOG = LoggerFactory
.getLogger(TestPeriodicRLESparseResourceAllocation.class);
@Test
public void testPeriodicCapacity() {
int[] alloc = {10, 7, 5, 2, 0};
long[] timeSteps = {0L, 5L, 10L, 15L, 19L};
RLESparseResourceAllocation rleSparseVector =
ReservationSystemTestUtil.generateRLESparseResourceAllocation(
alloc, timeSteps);
PeriodicRLESparseResourceAllocation periodicVector =
new PeriodicRLESparseResourceAllocation(rleSparseVector, 20L);
LOG.info(periodicVector.toString());
Assert.assertEquals(Resource.newInstance(5, 5),
periodicVector.getCapacityAtTime(10L));
Assert.assertEquals(Resource.newInstance(10, 10),
periodicVector.getCapacityAtTime(20L));
Assert.assertEquals(Resource.newInstance(7, 7),
periodicVector.getCapacityAtTime(27L));
Assert.assertEquals(Resource.newInstance(5, 5),
periodicVector.getCapacityAtTime(50L));
}
@Test
public void testMaxPeriodicCapacity() {
int[] alloc = {2, 5, 7, 10, 3, 4, 6, 8};
long[] timeSteps = {0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L};
RLESparseResourceAllocation rleSparseVector =
ReservationSystemTestUtil.generateRLESparseResourceAllocation(
alloc, timeSteps);
PeriodicRLESparseResourceAllocation periodicVector =
new PeriodicRLESparseResourceAllocation(rleSparseVector, 8L);
LOG.info(periodicVector.toString());
Assert.assertEquals(
periodicVector.getMaximumPeriodicCapacity(0, 1),
Resource.newInstance(10, 10));
Assert.assertEquals(
periodicVector.getMaximumPeriodicCapacity(8, 2),
Resource.newInstance(7, 7));
Assert.assertEquals(
periodicVector.getMaximumPeriodicCapacity(16, 3),
Resource.newInstance(10, 10));
Assert.assertEquals(
periodicVector.getMaximumPeriodicCapacity(17, 4),
Resource.newInstance(5, 5));
Assert.assertEquals(
periodicVector.getMaximumPeriodicCapacity(32, 5),
Resource.newInstance(4, 4));
}
@Test
public void testSetCapacityInInterval() {
int[] alloc = {2, 5, 0};
long[] timeSteps = {1L, 2L, 3L};
RLESparseResourceAllocation rleSparseVector =
ReservationSystemTestUtil.generateRLESparseResourceAllocation(
alloc, timeSteps);
PeriodicRLESparseResourceAllocation periodicVector =
new PeriodicRLESparseResourceAllocation(rleSparseVector, 10L);
ReservationInterval interval = new ReservationInterval(5L, 10L);
periodicVector.addInterval(
interval, Resource.newInstance(8, 8));
Assert.assertEquals(Resource.newInstance(8, 8),
periodicVector.getCapacityAtTime(5L));
Assert.assertEquals(Resource.newInstance(8, 8),
periodicVector.getCapacityAtTime(9L));
Assert.assertEquals(Resource.newInstance(0, 0),
periodicVector.getCapacityAtTime(10L));
Assert.assertEquals(Resource.newInstance(0, 0),
periodicVector.getCapacityAtTime(0L));
Assert.assertFalse(periodicVector.addInterval(
new ReservationInterval(7L, 12L), Resource.newInstance(8, 8)));
}
public void testRemoveInterval() {
int[] alloc = {2, 5, 3, 4, 0};
long[] timeSteps = {1L, 3L, 5L, 7L, 9L};
RLESparseResourceAllocation rleSparseVector =
ReservationSystemTestUtil.generateRLESparseResourceAllocation(
alloc, timeSteps);
PeriodicRLESparseResourceAllocation periodicVector =
new PeriodicRLESparseResourceAllocation(rleSparseVector, 10L);
ReservationInterval interval = new ReservationInterval(3L, 7L);
Assert.assertTrue(periodicVector.removeInterval(
interval, Resource.newInstance(3, 3)));
Assert.assertEquals(Resource.newInstance(2, 2),
periodicVector.getCapacityAtTime(1L));
Assert.assertEquals(Resource.newInstance(2, 2),
periodicVector.getCapacityAtTime(2L));
Assert.assertEquals(Resource.newInstance(2, 2),
periodicVector.getCapacityAtTime(3L));
Assert.assertEquals(Resource.newInstance(2, 2),
periodicVector.getCapacityAtTime(4L));
Assert.assertEquals(Resource.newInstance(0, 0),
periodicVector.getCapacityAtTime(5L));
Assert.assertEquals(Resource.newInstance(0, 0),
periodicVector.getCapacityAtTime(6L));
Assert.assertEquals(Resource.newInstance(4, 4),
periodicVector.getCapacityAtTime(7L));
// invalid interval
Assert.assertFalse(periodicVector.removeInterval(
new ReservationInterval(7L, 12L), Resource.newInstance(1, 1)));
// invalid capacity
Assert.assertFalse(periodicVector.removeInterval(
new ReservationInterval(2L, 4L), Resource.newInstance(8, 8)));
}
}

View File

@ -524,7 +524,61 @@ public class TestRLESparseResourceAllocation {
}
}
private void setupArrays(TreeMap<Long, Resource> a, TreeMap<Long, Resource> b) {
@Test
public void testMaxPeriodicCapacity() {
long[] timeSteps = {0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L};
int[] alloc = {2, 5, 7, 10, 3, 4, 6, 8};
RLESparseResourceAllocation rleSparseVector =
ReservationSystemTestUtil.generateRLESparseResourceAllocation(
alloc, timeSteps);
LOG.info(rleSparseVector.toString());
Assert.assertEquals(
rleSparseVector.getMaximumPeriodicCapacity(0, 1),
Resource.newInstance(10, 10));
Assert.assertEquals(
rleSparseVector.getMaximumPeriodicCapacity(0, 2),
Resource.newInstance(7, 7));
Assert.assertEquals(
rleSparseVector.getMaximumPeriodicCapacity(0, 3),
Resource.newInstance(10, 10));
Assert.assertEquals(
rleSparseVector.getMaximumPeriodicCapacity(0, 4),
Resource.newInstance(3, 3));
Assert.assertEquals(
rleSparseVector.getMaximumPeriodicCapacity(0, 5),
Resource.newInstance(4, 4));
Assert.assertEquals(
rleSparseVector.getMaximumPeriodicCapacity(0, 5),
Resource.newInstance(4, 4));
Assert.assertEquals(
rleSparseVector.getMaximumPeriodicCapacity(7, 5),
Resource.newInstance(8, 8));
Assert.assertEquals(
rleSparseVector.getMaximumPeriodicCapacity(10, 3),
Resource.newInstance(0, 0));
}
@Test
public void testGetMinimumCapacityInInterval() {
long[] timeSteps = {0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L};
int[] alloc = {2, 5, 7, 10, 3, 4, 0, 8};
RLESparseResourceAllocation rleSparseVector =
ReservationSystemTestUtil.generateRLESparseResourceAllocation(
alloc, timeSteps);
LOG.info(rleSparseVector.toString());
Assert.assertEquals(
rleSparseVector.getMinimumCapacityInInterval(
new ReservationInterval(1L, 3L)), Resource.newInstance(5, 5));
Assert.assertEquals(
rleSparseVector.getMinimumCapacityInInterval(
new ReservationInterval(2L, 5L)), Resource.newInstance(3, 3));
Assert.assertEquals(
rleSparseVector.getMinimumCapacityInInterval(
new ReservationInterval(1L, 7L)), Resource.newInstance(0, 0));
}
private void setupArrays(
TreeMap<Long, Resource> a, TreeMap<Long, Resource> b) {
a.put(10L, Resource.newInstance(5, 5));
a.put(20L, Resource.newInstance(10, 10));
a.put(30L, Resource.newInstance(15, 15));