YARN-3463. Integrate OrderingPolicy Framework with CapacityScheduler. (Craig Welch via wangda)
This commit is contained in:
parent
c92f6f3605
commit
44872b76fc
|
@ -87,6 +87,9 @@ Release 2.8.0 - UNRELEASED
|
|||
YARN-1402. Update related Web UI and CLI with exposing client API to check
|
||||
log aggregation status. (Xuan Gong via junping_du)
|
||||
|
||||
YARN-3463. Integrate OrderingPolicy Framework with CapacityScheduler.
|
||||
(Craig Welch via wangda)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-1880. Cleanup TestApplicationClientProtocolOnHA
|
||||
|
|
|
@ -550,9 +550,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
|||
|
||||
// lock the leafqueue while we scan applications and unreserve
|
||||
synchronized (qT.leafQueue) {
|
||||
NavigableSet<FiCaSchedulerApp> ns =
|
||||
(NavigableSet<FiCaSchedulerApp>) qT.leafQueue.getApplications();
|
||||
Iterator<FiCaSchedulerApp> desc = ns.descendingIterator();
|
||||
Iterator<FiCaSchedulerApp> desc =
|
||||
qT.leafQueue.getOrderingPolicy().getPreemptionIterator();
|
||||
qT.actuallyPreempted = Resources.clone(resToObtain);
|
||||
while (desc.hasNext()) {
|
||||
FiCaSchedulerApp fc = desc.next();
|
||||
|
|
|
@ -59,6 +59,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainer
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
|
||||
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
@ -72,7 +74,7 @@ import com.google.common.collect.Multiset;
|
|||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public class SchedulerApplicationAttempt {
|
||||
public class SchedulerApplicationAttempt implements SchedulableEntity {
|
||||
|
||||
private static final Log LOG = LogFactory
|
||||
.getLog(SchedulerApplicationAttempt.class);
|
||||
|
@ -710,4 +712,24 @@ public class SchedulerApplicationAttempt {
|
|||
public ResourceUsage getAppAttemptResourceUsage() {
|
||||
return this.attemptResourceUsage;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getId() {
|
||||
return getApplicationId().toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareInputOrderTo(SchedulableEntity other) {
|
||||
if (other instanceof SchedulerApplicationAttempt) {
|
||||
return getApplicationId().compareTo(
|
||||
((SchedulerApplicationAttempt)other).getApplicationId());
|
||||
}
|
||||
return 1;//let other types go before this, if any
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized ResourceUsage getSchedulingResourceUsage() {
|
||||
return attemptResourceUsage;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -48,6 +48,9 @@ import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
|||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*;
|
||||
|
||||
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
|
||||
public class CapacitySchedulerConfiguration extends ReservationSchedulerConfiguration {
|
||||
|
@ -117,6 +120,10 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|||
public static final String MAXIMUM_ALLOCATION_VCORES =
|
||||
"maximum-allocation-vcores";
|
||||
|
||||
public static final String ORDERING_POLICY = "ordering-policy";
|
||||
|
||||
public static final String DEFAULT_ORDERING_POLICY = "fifo";
|
||||
|
||||
@Private
|
||||
public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000;
|
||||
|
||||
|
@ -379,6 +386,28 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|||
return userLimit;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public <S extends SchedulableEntity> OrderingPolicy<S> getOrderingPolicy(
|
||||
String queue) {
|
||||
|
||||
String policyType = get(getQueuePrefix(queue) + ORDERING_POLICY,
|
||||
DEFAULT_ORDERING_POLICY);
|
||||
|
||||
OrderingPolicy<S> orderingPolicy;
|
||||
|
||||
if (policyType.trim().equals("fifo")) {
|
||||
policyType = FifoOrderingPolicy.class.getName();
|
||||
}
|
||||
try {
|
||||
orderingPolicy = (OrderingPolicy<S>)
|
||||
Class.forName(policyType).newInstance();
|
||||
} catch (Exception e) {
|
||||
String message = "Unable to construct ordering policy for: " + policyType + ", " + e.getMessage();
|
||||
throw new RuntimeException(message, e);
|
||||
}
|
||||
return orderingPolicy;
|
||||
}
|
||||
|
||||
public void setUserLimit(String queue, int userLimit) {
|
||||
setInt(getQueuePrefix(queue) + USER_LIMIT, userLimit);
|
||||
LOG.debug("here setUserLimit: queuePrefix=" + getQueuePrefix(queue) +
|
||||
|
|
|
@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
|
@ -93,7 +94,6 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
|
||||
private int nodeLocalityDelay;
|
||||
|
||||
Set<FiCaSchedulerApp> activeApplications;
|
||||
Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap =
|
||||
new HashMap<ApplicationAttemptId, FiCaSchedulerApp>();
|
||||
|
||||
|
@ -121,6 +121,9 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
|
||||
private volatile ResourceLimits currentResourceLimits = null;
|
||||
|
||||
private OrderingPolicy<FiCaSchedulerApp>
|
||||
orderingPolicy = new FifoOrderingPolicy<FiCaSchedulerApp>();
|
||||
|
||||
public LeafQueue(CapacitySchedulerContext cs,
|
||||
String queueName, CSQueue parent, CSQueue old) throws IOException {
|
||||
super(cs, queueName, parent, old);
|
||||
|
@ -137,7 +140,6 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
cs.getApplicationComparator();
|
||||
this.pendingApplications =
|
||||
new TreeSet<FiCaSchedulerApp>(applicationComparator);
|
||||
this.activeApplications = new TreeSet<FiCaSchedulerApp>(applicationComparator);
|
||||
|
||||
setupQueueConfigs(cs.getClusterResource());
|
||||
}
|
||||
|
@ -159,6 +161,9 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
setQueueResourceLimitsInfo(clusterResource);
|
||||
|
||||
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
|
||||
|
||||
setOrderingPolicy(conf.<FiCaSchedulerApp>getOrderingPolicy(getQueuePath()));
|
||||
|
||||
userLimit = conf.getUserLimit(getQueuePath());
|
||||
userLimitFactor = conf.getUserLimitFactor(getQueuePath());
|
||||
|
||||
|
@ -322,7 +327,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
}
|
||||
|
||||
public synchronized int getNumActiveApplications() {
|
||||
return activeApplications.size();
|
||||
return orderingPolicy.getNumSchedulableEntities();
|
||||
}
|
||||
|
||||
@Private
|
||||
|
@ -637,7 +642,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
}
|
||||
}
|
||||
user.activateApplication();
|
||||
activeApplications.add(application);
|
||||
orderingPolicy.addSchedulableEntity(application);
|
||||
queueUsage.incAMUsed(application.getAMResource());
|
||||
user.getResourceUsage().incAMUsed(application.getAMResource());
|
||||
i.remove();
|
||||
|
@ -686,7 +691,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
|
||||
public synchronized void removeApplicationAttempt(
|
||||
FiCaSchedulerApp application, User user) {
|
||||
boolean wasActive = activeApplications.remove(application);
|
||||
boolean wasActive =
|
||||
orderingPolicy.removeSchedulableEntity(application);
|
||||
if (!wasActive) {
|
||||
pendingApplications.remove(application);
|
||||
} else {
|
||||
|
@ -727,7 +733,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("assignContainers: node=" + node.getNodeName()
|
||||
+ " #applications=" + activeApplications.size());
|
||||
+ " #applications=" +
|
||||
orderingPolicy.getNumSchedulableEntities());
|
||||
}
|
||||
|
||||
// Check for reserved resources
|
||||
|
@ -759,9 +766,10 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
return NULL_ASSIGNMENT;
|
||||
}
|
||||
|
||||
// Try to assign containers to applications in order
|
||||
for (FiCaSchedulerApp application : activeApplications) {
|
||||
|
||||
for (Iterator<FiCaSchedulerApp> assignmentIterator =
|
||||
orderingPolicy.getAssignmentIterator();
|
||||
assignmentIterator.hasNext();) {
|
||||
FiCaSchedulerApp application = assignmentIterator.next();
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("pre-assignContainers for application "
|
||||
+ application.getApplicationId());
|
||||
|
@ -1607,6 +1615,9 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
// Inform the node
|
||||
node.allocateContainer(allocatedContainer);
|
||||
|
||||
// Inform the ordering policy
|
||||
orderingPolicy.containerAllocated(application, allocatedContainer);
|
||||
|
||||
LOG.info("assignedContainer" +
|
||||
" application attempt=" + application.getApplicationAttemptId() +
|
||||
" container=" + container +
|
||||
|
@ -1715,11 +1726,16 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
removed =
|
||||
application.containerCompleted(rmContainer, containerStatus,
|
||||
event, node.getPartition());
|
||||
|
||||
node.releaseContainer(container);
|
||||
}
|
||||
|
||||
// Book-keeping
|
||||
if (removed) {
|
||||
|
||||
// Inform the ordering policy
|
||||
orderingPolicy.containerReleased(application, rmContainer);
|
||||
|
||||
releaseResource(clusterResource, application,
|
||||
container.getResource(), node.getPartition());
|
||||
LOG.info("completedContainer" +
|
||||
|
@ -1822,7 +1838,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
activateApplications();
|
||||
|
||||
// Update application properties
|
||||
for (FiCaSchedulerApp application : activeApplications) {
|
||||
for (FiCaSchedulerApp application :
|
||||
orderingPolicy.getSchedulableEntities()) {
|
||||
synchronized (application) {
|
||||
computeUserLimitAndSetHeadroom(application, clusterResource,
|
||||
Resources.none(), RMNodeLabelsManager.NO_LABEL,
|
||||
|
@ -1920,15 +1937,15 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
/**
|
||||
* Obtain (read-only) collection of active applications.
|
||||
*/
|
||||
public Set<FiCaSchedulerApp> getApplications() {
|
||||
// need to access the list of apps from the preemption monitor
|
||||
return activeApplications;
|
||||
public Collection<FiCaSchedulerApp> getApplications() {
|
||||
return orderingPolicy.getSchedulableEntities();
|
||||
}
|
||||
|
||||
// return a single Resource capturing the overal amount of pending resources
|
||||
public synchronized Resource getTotalResourcePending() {
|
||||
Resource ret = BuilderUtils.newResource(0, 0);
|
||||
for (FiCaSchedulerApp f : activeApplications) {
|
||||
for (FiCaSchedulerApp f :
|
||||
orderingPolicy.getSchedulableEntities()) {
|
||||
Resources.addTo(ret, f.getTotalPendingRequests());
|
||||
}
|
||||
return ret;
|
||||
|
@ -1940,7 +1957,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
for (FiCaSchedulerApp pendingApp : pendingApplications) {
|
||||
apps.add(pendingApp.getApplicationAttemptId());
|
||||
}
|
||||
for (FiCaSchedulerApp app : activeApplications) {
|
||||
for (FiCaSchedulerApp app :
|
||||
orderingPolicy.getSchedulableEntities()) {
|
||||
apps.add(app.getApplicationAttemptId());
|
||||
}
|
||||
}
|
||||
|
@ -1993,6 +2011,19 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
this.maxApplications = maxApplications;
|
||||
}
|
||||
|
||||
public synchronized OrderingPolicy<FiCaSchedulerApp>
|
||||
getOrderingPolicy() {
|
||||
return orderingPolicy;
|
||||
}
|
||||
|
||||
public synchronized void setOrderingPolicy(
|
||||
OrderingPolicy<FiCaSchedulerApp> orderingPolicy) {
|
||||
orderingPolicy.addAllSchedulableEntities(
|
||||
this.orderingPolicy.getSchedulableEntities()
|
||||
);
|
||||
this.orderingPolicy = orderingPolicy;
|
||||
}
|
||||
|
||||
/*
|
||||
* Holds shared values used by all applications in
|
||||
* the queue to calculate headroom on demand
|
||||
|
|
|
@ -68,15 +68,6 @@ public abstract class AbstractComparatorOrderingPolicy<S extends SchedulableEnti
|
|||
schedulableEntities.add(schedulableEntity);
|
||||
}
|
||||
|
||||
public void setComparator(Comparator<SchedulableEntity> comparator) {
|
||||
this.comparator = comparator;
|
||||
TreeSet<S> schedulableEntities = new TreeSet<S>(comparator);
|
||||
if (this.schedulableEntities != null) {
|
||||
schedulableEntities.addAll(this.schedulableEntities);
|
||||
}
|
||||
this.schedulableEntities = schedulableEntities;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public Comparator<SchedulableEntity> getComparator() {
|
||||
return comparator;
|
||||
|
@ -103,7 +94,7 @@ public abstract class AbstractComparatorOrderingPolicy<S extends SchedulableEnti
|
|||
}
|
||||
|
||||
@Override
|
||||
public abstract void configure(String conf);
|
||||
public abstract void configure(Map<String, String> conf);
|
||||
|
||||
@Override
|
||||
public abstract void containerAllocated(S schedulableEntity,
|
||||
|
@ -114,6 +105,6 @@ public abstract class AbstractComparatorOrderingPolicy<S extends SchedulableEnti
|
|||
RMContainer r);
|
||||
|
||||
@Override
|
||||
public abstract String getStatusMessage();
|
||||
public abstract String getInfo();
|
||||
|
||||
}
|
||||
|
|
|
@ -28,11 +28,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
|
|||
public class FifoOrderingPolicy<S extends SchedulableEntity> extends AbstractComparatorOrderingPolicy<S> {
|
||||
|
||||
public FifoOrderingPolicy() {
|
||||
setComparator(new FifoComparator());
|
||||
this.comparator = new FifoComparator();
|
||||
this.schedulableEntities = new TreeSet<S>(comparator);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(String conf) {
|
||||
public void configure(Map<String, String> conf) {
|
||||
|
||||
}
|
||||
|
||||
|
@ -47,7 +48,7 @@ public class FifoOrderingPolicy<S extends SchedulableEntity> extends AbstractCom
|
|||
}
|
||||
|
||||
@Override
|
||||
public String getStatusMessage() {
|
||||
public String getInfo() {
|
||||
return "FifoOrderingPolicy";
|
||||
}
|
||||
|
||||
|
|
|
@ -83,7 +83,7 @@ public interface OrderingPolicy<S extends SchedulableEntity> {
|
|||
* Provides configuration information for the policy from the scheduler
|
||||
* configuration
|
||||
*/
|
||||
public void configure(String conf);
|
||||
public void configure(Map<String, String> conf);
|
||||
|
||||
/**
|
||||
* The passed SchedulableEntity has been allocated the passed Container,
|
||||
|
@ -104,6 +104,6 @@ public interface OrderingPolicy<S extends SchedulableEntity> {
|
|||
/**
|
||||
* Display information regarding configuration & status
|
||||
*/
|
||||
public String getStatusMessage();
|
||||
public String getInfo();
|
||||
|
||||
}
|
||||
|
|
|
@ -94,6 +94,7 @@ class CapacitySchedulerPage extends RmView {
|
|||
_("Configured Minimum User Limit Percent:", Integer.toString(lqinfo.getUserLimit()) + "%").
|
||||
_("Configured User Limit Factor:", String.format("%.1f", lqinfo.getUserLimitFactor())).
|
||||
_("Accessible Node Labels:", StringUtils.join(",", lqinfo.getNodeLabels())).
|
||||
_("Ordering Policy: ", lqinfo.getOrderingPolicyInfo()).
|
||||
_("Preemption:", lqinfo.getPreemptionDisabled() ? "disabled" : "enabled");
|
||||
|
||||
html._(InfoBlock.class);
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
|
|||
import javax.xml.bind.annotation.XmlAccessType;
|
||||
import javax.xml.bind.annotation.XmlAccessorType;
|
||||
import javax.xml.bind.annotation.XmlRootElement;
|
||||
import javax.xml.bind.annotation.XmlTransient;
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
||||
|
||||
|
@ -40,6 +41,9 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
|
|||
protected ResourceInfo userAMResourceLimit;
|
||||
protected boolean preemptionDisabled;
|
||||
|
||||
@XmlTransient
|
||||
protected String orderingPolicyInfo;
|
||||
|
||||
CapacitySchedulerLeafQueueInfo() {
|
||||
};
|
||||
|
||||
|
@ -57,6 +61,7 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
|
|||
usedAMResource = new ResourceInfo(q.getQueueResourceUsage().getAMUsed());
|
||||
userAMResourceLimit = new ResourceInfo(q.getUserAMResourceLimit());
|
||||
preemptionDisabled = q.getPreemptionDisabled();
|
||||
orderingPolicyInfo = q.getOrderingPolicy().getInfo();
|
||||
}
|
||||
|
||||
public int getNumActiveApplications() {
|
||||
|
@ -107,4 +112,8 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
|
|||
public boolean getPreemptionDisabled() {
|
||||
return preemptionDisabled;
|
||||
}
|
||||
|
||||
public String getOrderingPolicyInfo() {
|
||||
return orderingPolicyInfo;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.Pro
|
|||
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
@ -38,6 +39,8 @@ import static org.mockito.Mockito.verify;
|
|||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
|
@ -46,6 +49,7 @@ import java.util.HashMap;
|
|||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.Random;
|
||||
|
@ -1032,7 +1036,7 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|||
when(lq.getTotalResourcePending()).thenReturn(
|
||||
Resource.newInstance(pending[i], 0));
|
||||
// consider moving where CapacityScheduler::comparator accessible
|
||||
NavigableSet<FiCaSchedulerApp> qApps = new TreeSet<FiCaSchedulerApp>(
|
||||
final NavigableSet<FiCaSchedulerApp> qApps = new TreeSet<FiCaSchedulerApp>(
|
||||
new Comparator<FiCaSchedulerApp>() {
|
||||
@Override
|
||||
public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
|
||||
|
@ -1056,6 +1060,14 @@ public class TestProportionalCapacityPreemptionPolicy {
|
|||
.thenReturn(appAttemptIdList);
|
||||
}
|
||||
when(lq.getApplications()).thenReturn(qApps);
|
||||
@SuppressWarnings("unchecked")
|
||||
OrderingPolicy<FiCaSchedulerApp> so = mock(OrderingPolicy.class);
|
||||
when(so.getPreemptionIterator()).thenAnswer(new Answer() {
|
||||
public Object answer(InvocationOnMock invocation) {
|
||||
return qApps.descendingIterator();
|
||||
}
|
||||
});
|
||||
when(lq.getOrderingPolicy()).thenReturn(so);
|
||||
if(setAMResourcePercent != 0.0f){
|
||||
when(lq.getMaxAMResourcePerQueuePercent()).thenReturn(setAMResourcePercent);
|
||||
}
|
||||
|
|
|
@ -155,6 +155,7 @@ public class TestApplicationLimits {
|
|||
doReturn(applicationAttemptId). when(application).getApplicationAttemptId();
|
||||
doReturn(user).when(application).getUser();
|
||||
doReturn(amResource).when(application).getAMResource();
|
||||
when(application.compareInputOrderTo(any(FiCaSchedulerApp.class))).thenCallRealMethod();
|
||||
return application;
|
||||
}
|
||||
|
||||
|
@ -469,7 +470,7 @@ public class TestApplicationLimits {
|
|||
assertEquals(0, queue.getNumPendingApplications());
|
||||
assertEquals(1, queue.getNumActiveApplications(user_0));
|
||||
assertEquals(0, queue.getNumPendingApplications(user_0));
|
||||
assertTrue(queue.activeApplications.contains(app_0));
|
||||
assertTrue(queue.getApplications().contains(app_0));
|
||||
|
||||
// Submit second application
|
||||
FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0,
|
||||
|
@ -479,7 +480,7 @@ public class TestApplicationLimits {
|
|||
assertEquals(0, queue.getNumPendingApplications());
|
||||
assertEquals(2, queue.getNumActiveApplications(user_0));
|
||||
assertEquals(0, queue.getNumPendingApplications(user_0));
|
||||
assertTrue(queue.activeApplications.contains(app_1));
|
||||
assertTrue(queue.getApplications().contains(app_1));
|
||||
|
||||
// Submit third application, should remain pending
|
||||
FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0,
|
||||
|
@ -508,7 +509,7 @@ public class TestApplicationLimits {
|
|||
assertEquals(2, queue.getNumActiveApplications(user_0));
|
||||
assertEquals(1, queue.getNumPendingApplications(user_0));
|
||||
assertFalse(queue.pendingApplications.contains(app_2));
|
||||
assertFalse(queue.activeApplications.contains(app_2));
|
||||
assertFalse(queue.getApplications().contains(app_2));
|
||||
|
||||
// Finish 1st application, app_3 should become active
|
||||
queue.finishApplicationAttempt(app_0, A);
|
||||
|
@ -516,9 +517,9 @@ public class TestApplicationLimits {
|
|||
assertEquals(0, queue.getNumPendingApplications());
|
||||
assertEquals(2, queue.getNumActiveApplications(user_0));
|
||||
assertEquals(0, queue.getNumPendingApplications(user_0));
|
||||
assertTrue(queue.activeApplications.contains(app_3));
|
||||
assertTrue(queue.getApplications().contains(app_3));
|
||||
assertFalse(queue.pendingApplications.contains(app_3));
|
||||
assertFalse(queue.activeApplications.contains(app_0));
|
||||
assertFalse(queue.getApplications().contains(app_0));
|
||||
|
||||
// Finish 2nd application
|
||||
queue.finishApplicationAttempt(app_1, A);
|
||||
|
@ -526,7 +527,7 @@ public class TestApplicationLimits {
|
|||
assertEquals(0, queue.getNumPendingApplications());
|
||||
assertEquals(1, queue.getNumActiveApplications(user_0));
|
||||
assertEquals(0, queue.getNumPendingApplications(user_0));
|
||||
assertFalse(queue.activeApplications.contains(app_1));
|
||||
assertFalse(queue.getApplications().contains(app_1));
|
||||
|
||||
// Finish 4th application
|
||||
queue.finishApplicationAttempt(app_3, A);
|
||||
|
@ -534,7 +535,7 @@ public class TestApplicationLimits {
|
|||
assertEquals(0, queue.getNumPendingApplications());
|
||||
assertEquals(0, queue.getNumActiveApplications(user_0));
|
||||
assertEquals(0, queue.getNumPendingApplications(user_0));
|
||||
assertFalse(queue.activeApplications.contains(app_3));
|
||||
assertFalse(queue.getApplications().contains(app_3));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -73,6 +73,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||
|
@ -381,6 +384,20 @@ public class TestLeafQueue {
|
|||
d.submitApplicationAttempt(app_1, user_d); // same user
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPolicyConfiguration() throws Exception {
|
||||
|
||||
CapacitySchedulerConfiguration testConf =
|
||||
new CapacitySchedulerConfiguration();
|
||||
|
||||
String tproot = CapacitySchedulerConfiguration.ROOT + "." +
|
||||
"testPolicyRoot" + System.currentTimeMillis();
|
||||
|
||||
OrderingPolicy<FiCaSchedulerApp> comPol =
|
||||
testConf.<FiCaSchedulerApp>getOrderingPolicy(tproot);
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAppAttemptMetrics() throws Exception {
|
||||
|
@ -2011,7 +2028,7 @@ public class TestLeafQueue {
|
|||
e.submitApplicationAttempt(app_2, user_e); // same user
|
||||
|
||||
// before reinitialization
|
||||
assertEquals(2, e.activeApplications.size());
|
||||
assertEquals(2, e.getNumActiveApplications());
|
||||
assertEquals(1, e.pendingApplications.size());
|
||||
|
||||
csConf.setDouble(CapacitySchedulerConfiguration
|
||||
|
@ -2028,7 +2045,7 @@ public class TestLeafQueue {
|
|||
root.reinitialize(newRoot, csContext.getClusterResource());
|
||||
|
||||
// after reinitialization
|
||||
assertEquals(3, e.activeApplications.size());
|
||||
assertEquals(3, e.getNumActiveApplications());
|
||||
assertEquals(0, e.pendingApplications.size());
|
||||
}
|
||||
|
||||
|
@ -2092,7 +2109,7 @@ public class TestLeafQueue {
|
|||
e.submitApplicationAttempt(app_2, user_e); // same user
|
||||
|
||||
// before updating cluster resource
|
||||
assertEquals(2, e.activeApplications.size());
|
||||
assertEquals(2, e.getNumActiveApplications());
|
||||
assertEquals(1, e.pendingApplications.size());
|
||||
|
||||
Resource clusterResource = Resources.createResource(200 * 16 * GB, 100 * 32);
|
||||
|
@ -2100,7 +2117,7 @@ public class TestLeafQueue {
|
|||
new ResourceLimits(clusterResource));
|
||||
|
||||
// after updating cluster resource
|
||||
assertEquals(3, e.activeApplications.size());
|
||||
assertEquals(3, e.getNumActiveApplications());
|
||||
assertEquals(0, e.pendingApplications.size());
|
||||
}
|
||||
|
||||
|
@ -2452,6 +2469,83 @@ public class TestLeafQueue {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFifoAssignment() throws Exception {
|
||||
|
||||
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
|
||||
|
||||
a.setOrderingPolicy(new FifoOrderingPolicy<FiCaSchedulerApp>());
|
||||
|
||||
String host_0_0 = "127.0.0.1";
|
||||
String rack_0 = "rack_0";
|
||||
FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 16*GB);
|
||||
|
||||
final int numNodes = 4;
|
||||
Resource clusterResource = Resources.createResource(
|
||||
numNodes * (16*GB), numNodes * 16);
|
||||
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
||||
|
||||
String user_0 = "user_0";
|
||||
|
||||
final ApplicationAttemptId appAttemptId_0 =
|
||||
TestUtils.getMockApplicationAttemptId(0, 0);
|
||||
FiCaSchedulerApp app_0 =
|
||||
spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
|
||||
mock(ActiveUsersManager.class), spyRMContext));
|
||||
a.submitApplicationAttempt(app_0, user_0);
|
||||
|
||||
final ApplicationAttemptId appAttemptId_1 =
|
||||
TestUtils.getMockApplicationAttemptId(1, 0);
|
||||
FiCaSchedulerApp app_1 =
|
||||
spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a,
|
||||
mock(ActiveUsersManager.class), spyRMContext));
|
||||
a.submitApplicationAttempt(app_1, user_0);
|
||||
|
||||
Priority priority = TestUtils.createMockPriority(1);
|
||||
List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
|
||||
List<ResourceRequest> app_1_requests_0 = new ArrayList<ResourceRequest>();
|
||||
|
||||
app_0_requests_0.clear();
|
||||
app_0_requests_0.add(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1,
|
||||
true, priority, recordFactory));
|
||||
app_0.updateResourceRequests(app_0_requests_0);
|
||||
|
||||
app_1_requests_0.clear();
|
||||
app_1_requests_0.add(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1,
|
||||
true, priority, recordFactory));
|
||||
app_1.updateResourceRequests(app_1_requests_0);
|
||||
|
||||
a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||
Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
||||
a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||
Assert.assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
|
||||
|
||||
app_0_requests_0.clear();
|
||||
app_0_requests_0.add(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1,
|
||||
true, priority, recordFactory));
|
||||
app_0.updateResourceRequests(app_0_requests_0);
|
||||
|
||||
app_1_requests_0.clear();
|
||||
app_1_requests_0.add(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1,
|
||||
true, priority, recordFactory));
|
||||
app_1.updateResourceRequests(app_1_requests_0);
|
||||
|
||||
//Even thought it already has more resources, app_0 will still get
|
||||
//assigned first
|
||||
a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||
Assert.assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
|
||||
Assert.assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
|
||||
|
||||
//and only then will app_1
|
||||
a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||
Assert.assertEquals(2*GB, app_1.getCurrentConsumption().getMemory());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConcurrentAccess() throws Exception {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
|
|
Loading…
Reference in New Issue