diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index fa3061a4749..3f1551fbbc2 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -87,6 +87,9 @@ Release 2.8.0 - UNRELEASED YARN-1402. Update related Web UI and CLI with exposing client API to check log aggregation status. (Xuan Gong via junping_du) + YARN-3463. Integrate OrderingPolicy Framework with CapacityScheduler. + (Craig Welch via wangda) + IMPROVEMENTS YARN-1880. Cleanup TestApplicationClientProtocolOnHA diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 87a2a00dd73..2ab41979306 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -550,9 +550,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic // lock the leafqueue while we scan applications and unreserve synchronized (qT.leafQueue) { - NavigableSet ns = - (NavigableSet) qT.leafQueue.getApplications(); - Iterator desc = ns.descendingIterator(); + Iterator desc = + qT.leafQueue.getOrderingPolicy().getPreemptionIterator(); qT.actuallyPreempted = Resources.clone(resToObtain); while (desc.hasNext()) { FiCaSchedulerApp fc = desc.next(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 48233903bc6..0554c041380 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -59,6 +59,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainer import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -72,7 +74,7 @@ import com.google.common.collect.Multiset; */ @Private @Unstable -public class SchedulerApplicationAttempt { +public class SchedulerApplicationAttempt implements SchedulableEntity { private static final Log LOG = LogFactory .getLog(SchedulerApplicationAttempt.class); @@ -710,4 +712,24 @@ public class SchedulerApplicationAttempt { public ResourceUsage getAppAttemptResourceUsage() { return this.attemptResourceUsage; } + + @Override + public String getId() { + return getApplicationId().toString(); + } + + @Override + public int compareInputOrderTo(SchedulableEntity other) { + if (other instanceof SchedulerApplicationAttempt) { + return getApplicationId().compareTo( + ((SchedulerApplicationAttempt)other).getApplicationId()); + } + return 1;//let other types go before this, if any + } + + @Override + public synchronized ResourceUsage getSchedulingResourceUsage() { + return attemptResourceUsage; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 4e8d61769ec..c9e83a13435 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -48,6 +48,9 @@ import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*; + + import com.google.common.collect.ImmutableSet; public class CapacitySchedulerConfiguration extends ReservationSchedulerConfiguration { @@ -116,7 +119,11 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur @Private public static final String MAXIMUM_ALLOCATION_VCORES = "maximum-allocation-vcores"; - + + public static final String ORDERING_POLICY = "ordering-policy"; + + public static final String DEFAULT_ORDERING_POLICY = "fifo"; + @Private public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000; @@ -378,6 +385,28 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur DEFAULT_USER_LIMIT); return userLimit; } + + @SuppressWarnings("unchecked") + public OrderingPolicy getOrderingPolicy( + String queue) { + + String policyType = get(getQueuePrefix(queue) + ORDERING_POLICY, + DEFAULT_ORDERING_POLICY); + + OrderingPolicy orderingPolicy; + + if (policyType.trim().equals("fifo")) { + policyType = FifoOrderingPolicy.class.getName(); + } + try { + orderingPolicy = (OrderingPolicy) + Class.forName(policyType).newInstance(); + } catch (Exception e) { + String message = "Unable to construct ordering policy for: " + policyType + ", " + e.getMessage(); + throw new RuntimeException(message, e); + } + return orderingPolicy; + } public void setUserLimit(String queue, int userLimit) { setInt(getQueuePrefix(queue) + USER_LIMIT, userLimit); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index f860574a816..452d4a8ba8b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -93,7 +94,6 @@ public class LeafQueue extends AbstractCSQueue { private int nodeLocalityDelay; - Set activeApplications; Map applicationAttemptMap = new HashMap(); @@ -121,6 +121,9 @@ public class LeafQueue extends AbstractCSQueue { private volatile ResourceLimits currentResourceLimits = null; + private OrderingPolicy + orderingPolicy = new FifoOrderingPolicy(); + public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); @@ -137,7 +140,6 @@ public class LeafQueue extends AbstractCSQueue { cs.getApplicationComparator(); this.pendingApplications = new TreeSet(applicationComparator); - this.activeApplications = new TreeSet(applicationComparator); setupQueueConfigs(cs.getClusterResource()); } @@ -159,6 +161,9 @@ public class LeafQueue extends AbstractCSQueue { setQueueResourceLimitsInfo(clusterResource); CapacitySchedulerConfiguration conf = csContext.getConfiguration(); + + setOrderingPolicy(conf.getOrderingPolicy(getQueuePath())); + userLimit = conf.getUserLimit(getQueuePath()); userLimitFactor = conf.getUserLimitFactor(getQueuePath()); @@ -322,7 +327,7 @@ public class LeafQueue extends AbstractCSQueue { } public synchronized int getNumActiveApplications() { - return activeApplications.size(); + return orderingPolicy.getNumSchedulableEntities(); } @Private @@ -637,7 +642,7 @@ public class LeafQueue extends AbstractCSQueue { } } user.activateApplication(); - activeApplications.add(application); + orderingPolicy.addSchedulableEntity(application); queueUsage.incAMUsed(application.getAMResource()); user.getResourceUsage().incAMUsed(application.getAMResource()); i.remove(); @@ -686,7 +691,8 @@ public class LeafQueue extends AbstractCSQueue { public synchronized void removeApplicationAttempt( FiCaSchedulerApp application, User user) { - boolean wasActive = activeApplications.remove(application); + boolean wasActive = + orderingPolicy.removeSchedulableEntity(application); if (!wasActive) { pendingApplications.remove(application); } else { @@ -727,7 +733,8 @@ public class LeafQueue extends AbstractCSQueue { if(LOG.isDebugEnabled()) { LOG.debug("assignContainers: node=" + node.getNodeName() - + " #applications=" + activeApplications.size()); + + " #applications=" + + orderingPolicy.getNumSchedulableEntities()); } // Check for reserved resources @@ -759,9 +766,10 @@ public class LeafQueue extends AbstractCSQueue { return NULL_ASSIGNMENT; } - // Try to assign containers to applications in order - for (FiCaSchedulerApp application : activeApplications) { - + for (Iterator assignmentIterator = + orderingPolicy.getAssignmentIterator(); + assignmentIterator.hasNext();) { + FiCaSchedulerApp application = assignmentIterator.next(); if(LOG.isDebugEnabled()) { LOG.debug("pre-assignContainers for application " + application.getApplicationId()); @@ -1606,6 +1614,9 @@ public class LeafQueue extends AbstractCSQueue { // Inform the node node.allocateContainer(allocatedContainer); + + // Inform the ordering policy + orderingPolicy.containerAllocated(application, allocatedContainer); LOG.info("assignedContainer" + " application attempt=" + application.getApplicationAttemptId() + @@ -1715,11 +1726,16 @@ public class LeafQueue extends AbstractCSQueue { removed = application.containerCompleted(rmContainer, containerStatus, event, node.getPartition()); + node.releaseContainer(container); } // Book-keeping if (removed) { + + // Inform the ordering policy + orderingPolicy.containerReleased(application, rmContainer); + releaseResource(clusterResource, application, container.getResource(), node.getPartition()); LOG.info("completedContainer" + @@ -1822,7 +1838,8 @@ public class LeafQueue extends AbstractCSQueue { activateApplications(); // Update application properties - for (FiCaSchedulerApp application : activeApplications) { + for (FiCaSchedulerApp application : + orderingPolicy.getSchedulableEntities()) { synchronized (application) { computeUserLimitAndSetHeadroom(application, clusterResource, Resources.none(), RMNodeLabelsManager.NO_LABEL, @@ -1916,19 +1933,19 @@ public class LeafQueue extends AbstractCSQueue { } getParent().recoverContainer(clusterResource, attempt, rmContainer); } - + /** * Obtain (read-only) collection of active applications. */ - public Set getApplications() { - // need to access the list of apps from the preemption monitor - return activeApplications; + public Collection getApplications() { + return orderingPolicy.getSchedulableEntities(); } // return a single Resource capturing the overal amount of pending resources public synchronized Resource getTotalResourcePending() { Resource ret = BuilderUtils.newResource(0, 0); - for (FiCaSchedulerApp f : activeApplications) { + for (FiCaSchedulerApp f : + orderingPolicy.getSchedulableEntities()) { Resources.addTo(ret, f.getTotalPendingRequests()); } return ret; @@ -1940,7 +1957,8 @@ public class LeafQueue extends AbstractCSQueue { for (FiCaSchedulerApp pendingApp : pendingApplications) { apps.add(pendingApp.getApplicationAttemptId()); } - for (FiCaSchedulerApp app : activeApplications) { + for (FiCaSchedulerApp app : + orderingPolicy.getSchedulableEntities()) { apps.add(app.getApplicationAttemptId()); } } @@ -1993,6 +2011,19 @@ public class LeafQueue extends AbstractCSQueue { this.maxApplications = maxApplications; } + public synchronized OrderingPolicy + getOrderingPolicy() { + return orderingPolicy; + } + + public synchronized void setOrderingPolicy( + OrderingPolicy orderingPolicy) { + orderingPolicy.addAllSchedulableEntities( + this.orderingPolicy.getSchedulableEntities() + ); + this.orderingPolicy = orderingPolicy; + } + /* * Holds shared values used by all applications in * the queue to calculate headroom on demand diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java index 5b32b3dee8d..e046fcf66bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java @@ -68,15 +68,6 @@ public abstract class AbstractComparatorOrderingPolicy comparator) { - this.comparator = comparator; - TreeSet schedulableEntities = new TreeSet(comparator); - if (this.schedulableEntities != null) { - schedulableEntities.addAll(this.schedulableEntities); - } - this.schedulableEntities = schedulableEntities; - } - @VisibleForTesting public Comparator getComparator() { return comparator; @@ -103,7 +94,7 @@ public abstract class AbstractComparatorOrderingPolicy conf); @Override public abstract void containerAllocated(S schedulableEntity, @@ -114,6 +105,6 @@ public abstract class AbstractComparatorOrderingPolicy extends AbstractComparatorOrderingPolicy { public FifoOrderingPolicy() { - setComparator(new FifoComparator()); + this.comparator = new FifoComparator(); + this.schedulableEntities = new TreeSet(comparator); } @Override - public void configure(String conf) { + public void configure(Map conf) { } @@ -47,7 +48,7 @@ public class FifoOrderingPolicy extends AbstractCom } @Override - public String getStatusMessage() { + public String getInfo() { return "FifoOrderingPolicy"; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java index f907cea2001..aebdcdee990 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java @@ -83,7 +83,7 @@ public interface OrderingPolicy { * Provides configuration information for the policy from the scheduler * configuration */ - public void configure(String conf); + public void configure(Map conf); /** * The passed SchedulableEntity has been allocated the passed Container, @@ -104,6 +104,6 @@ public interface OrderingPolicy { /** * Display information regarding configuration & status */ - public String getStatusMessage(); + public String getInfo(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java index 6c34d99310e..2eeda6652bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java @@ -94,6 +94,7 @@ class CapacitySchedulerPage extends RmView { _("Configured Minimum User Limit Percent:", Integer.toString(lqinfo.getUserLimit()) + "%"). _("Configured User Limit Factor:", String.format("%.1f", lqinfo.getUserLimitFactor())). _("Accessible Node Labels:", StringUtils.join(",", lqinfo.getNodeLabels())). + _("Ordering Policy: ", lqinfo.getOrderingPolicyInfo()). _("Preemption:", lqinfo.getPreemptionDisabled() ? "disabled" : "enabled"); html._(InfoBlock.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java index 5258b3d1a2c..dc610784c81 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.bind.annotation.XmlTransient; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; @@ -39,6 +40,9 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo { protected ResourceInfo usedAMResource; protected ResourceInfo userAMResourceLimit; protected boolean preemptionDisabled; + + @XmlTransient + protected String orderingPolicyInfo; CapacitySchedulerLeafQueueInfo() { }; @@ -57,6 +61,7 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo { usedAMResource = new ResourceInfo(q.getQueueResourceUsage().getAMUsed()); userAMResourceLimit = new ResourceInfo(q.getUserAMResourceLimit()); preemptionDisabled = q.getPreemptionDisabled(); + orderingPolicyInfo = q.getOrderingPolicy().getInfo(); } public int getNumActiveApplications() { @@ -107,4 +112,8 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo { public boolean getPreemptionDisabled() { return preemptionDisabled; } + + public String getOrderingPolicyInfo() { + return orderingPolicyInfo; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 8f5237e1a3f..9e8b7698eb0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -25,6 +25,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.Pro import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -38,6 +39,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.util.ArrayList; import java.util.Comparator; @@ -46,6 +49,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Iterator; import java.util.Map; import java.util.NavigableSet; import java.util.Random; @@ -1032,7 +1036,7 @@ public class TestProportionalCapacityPreemptionPolicy { when(lq.getTotalResourcePending()).thenReturn( Resource.newInstance(pending[i], 0)); // consider moving where CapacityScheduler::comparator accessible - NavigableSet qApps = new TreeSet( + final NavigableSet qApps = new TreeSet( new Comparator() { @Override public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { @@ -1056,6 +1060,14 @@ public class TestProportionalCapacityPreemptionPolicy { .thenReturn(appAttemptIdList); } when(lq.getApplications()).thenReturn(qApps); + @SuppressWarnings("unchecked") + OrderingPolicy so = mock(OrderingPolicy.class); + when(so.getPreemptionIterator()).thenAnswer(new Answer() { + public Object answer(InvocationOnMock invocation) { + return qApps.descendingIterator(); + } + }); + when(lq.getOrderingPolicy()).thenReturn(so); if(setAMResourcePercent != 0.0f){ when(lq.getMaxAMResourcePerQueuePercent()).thenReturn(setAMResourcePercent); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index a41fdfa5aba..929b3e1d2ec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -155,6 +155,7 @@ public class TestApplicationLimits { doReturn(applicationAttemptId). when(application).getApplicationAttemptId(); doReturn(user).when(application).getUser(); doReturn(amResource).when(application).getAMResource(); + when(application.compareInputOrderTo(any(FiCaSchedulerApp.class))).thenCallRealMethod(); return application; } @@ -469,7 +470,7 @@ public class TestApplicationLimits { assertEquals(0, queue.getNumPendingApplications()); assertEquals(1, queue.getNumActiveApplications(user_0)); assertEquals(0, queue.getNumPendingApplications(user_0)); - assertTrue(queue.activeApplications.contains(app_0)); + assertTrue(queue.getApplications().contains(app_0)); // Submit second application FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0, @@ -479,7 +480,7 @@ public class TestApplicationLimits { assertEquals(0, queue.getNumPendingApplications()); assertEquals(2, queue.getNumActiveApplications(user_0)); assertEquals(0, queue.getNumPendingApplications(user_0)); - assertTrue(queue.activeApplications.contains(app_1)); + assertTrue(queue.getApplications().contains(app_1)); // Submit third application, should remain pending FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0, @@ -508,7 +509,7 @@ public class TestApplicationLimits { assertEquals(2, queue.getNumActiveApplications(user_0)); assertEquals(1, queue.getNumPendingApplications(user_0)); assertFalse(queue.pendingApplications.contains(app_2)); - assertFalse(queue.activeApplications.contains(app_2)); + assertFalse(queue.getApplications().contains(app_2)); // Finish 1st application, app_3 should become active queue.finishApplicationAttempt(app_0, A); @@ -516,9 +517,9 @@ public class TestApplicationLimits { assertEquals(0, queue.getNumPendingApplications()); assertEquals(2, queue.getNumActiveApplications(user_0)); assertEquals(0, queue.getNumPendingApplications(user_0)); - assertTrue(queue.activeApplications.contains(app_3)); + assertTrue(queue.getApplications().contains(app_3)); assertFalse(queue.pendingApplications.contains(app_3)); - assertFalse(queue.activeApplications.contains(app_0)); + assertFalse(queue.getApplications().contains(app_0)); // Finish 2nd application queue.finishApplicationAttempt(app_1, A); @@ -526,7 +527,7 @@ public class TestApplicationLimits { assertEquals(0, queue.getNumPendingApplications()); assertEquals(1, queue.getNumActiveApplications(user_0)); assertEquals(0, queue.getNumPendingApplications(user_0)); - assertFalse(queue.activeApplications.contains(app_1)); + assertFalse(queue.getApplications().contains(app_1)); // Finish 4th application queue.finishApplicationAttempt(app_3, A); @@ -534,7 +535,7 @@ public class TestApplicationLimits { assertEquals(0, queue.getNumPendingApplications()); assertEquals(0, queue.getNumActiveApplications(user_0)); assertEquals(0, queue.getNumPendingApplications(user_0)); - assertFalse(queue.activeApplications.contains(app_3)); + assertFalse(queue.getApplications().contains(app_3)); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 0a196041bd9..33b8f568f84 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -73,6 +73,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -381,6 +384,20 @@ public class TestLeafQueue { d.submitApplicationAttempt(app_1, user_d); // same user } + @Test + public void testPolicyConfiguration() throws Exception { + + CapacitySchedulerConfiguration testConf = + new CapacitySchedulerConfiguration(); + + String tproot = CapacitySchedulerConfiguration.ROOT + "." + + "testPolicyRoot" + System.currentTimeMillis(); + + OrderingPolicy comPol = + testConf.getOrderingPolicy(tproot); + + + } @Test public void testAppAttemptMetrics() throws Exception { @@ -2011,7 +2028,7 @@ public class TestLeafQueue { e.submitApplicationAttempt(app_2, user_e); // same user // before reinitialization - assertEquals(2, e.activeApplications.size()); + assertEquals(2, e.getNumActiveApplications()); assertEquals(1, e.pendingApplications.size()); csConf.setDouble(CapacitySchedulerConfiguration @@ -2028,7 +2045,7 @@ public class TestLeafQueue { root.reinitialize(newRoot, csContext.getClusterResource()); // after reinitialization - assertEquals(3, e.activeApplications.size()); + assertEquals(3, e.getNumActiveApplications()); assertEquals(0, e.pendingApplications.size()); } @@ -2092,7 +2109,7 @@ public class TestLeafQueue { e.submitApplicationAttempt(app_2, user_e); // same user // before updating cluster resource - assertEquals(2, e.activeApplications.size()); + assertEquals(2, e.getNumActiveApplications()); assertEquals(1, e.pendingApplications.size()); Resource clusterResource = Resources.createResource(200 * 16 * GB, 100 * 32); @@ -2100,7 +2117,7 @@ public class TestLeafQueue { new ResourceLimits(clusterResource)); // after updating cluster resource - assertEquals(3, e.activeApplications.size()); + assertEquals(3, e.getNumActiveApplications()); assertEquals(0, e.pendingApplications.size()); } @@ -2450,6 +2467,83 @@ public class TestLeafQueue { Assert.fail("NPE when allocating container on node but " + "forget to set off-switch request should be handled"); } + } + + @Test + public void testFifoAssignment() throws Exception { + + LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + + a.setOrderingPolicy(new FifoOrderingPolicy()); + + String host_0_0 = "127.0.0.1"; + String rack_0 = "rack_0"; + FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 16*GB); + + final int numNodes = 4; + Resource clusterResource = Resources.createResource( + numNodes * (16*GB), numNodes * 16); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + String user_0 = "user_0"; + + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), spyRMContext)); + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = + spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), spyRMContext)); + a.submitApplicationAttempt(app_1, user_0); + + Priority priority = TestUtils.createMockPriority(1); + List app_0_requests_0 = new ArrayList(); + List app_1_requests_0 = new ArrayList(); + + app_0_requests_0.clear(); + app_0_requests_0.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, + true, priority, recordFactory)); + app_0.updateResourceRequests(app_0_requests_0); + + app_1_requests_0.clear(); + app_1_requests_0.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, + true, priority, recordFactory)); + app_1.updateResourceRequests(app_1_requests_0); + + a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); + a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + Assert.assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); + + app_0_requests_0.clear(); + app_0_requests_0.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, + true, priority, recordFactory)); + app_0.updateResourceRequests(app_0_requests_0); + + app_1_requests_0.clear(); + app_1_requests_0.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, + true, priority, recordFactory)); + app_1.updateResourceRequests(app_1_requests_0); + + //Even thought it already has more resources, app_0 will still get + //assigned first + a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + Assert.assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); + Assert.assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); + + //and only then will app_1 + a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + Assert.assertEquals(2*GB, app_1.getCurrentConsumption().getMemory()); + } @Test