YARN-3319. Implement a FairOrderingPolicy. (Craig Welch via wangda)

This commit is contained in:
Wangda Tan 2015-04-23 10:47:15 -07:00
parent 49f6e3d35e
commit 395205444e
7 changed files with 450 additions and 2 deletions

View File

@ -100,6 +100,8 @@ Release 2.8.0 - UNRELEASED
network bandwidth traffic originating from YARN containers (Sidharta Seethana
via vinodkv)
YARN-3319. Implement a FairOrderingPolicy. (Craig Welch via wangda)
IMPROVEMENTS
YARN-1880. Cleanup TestApplicationClientProtocolOnHA

View File

@ -145,6 +145,14 @@
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoComparator" />
<Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
</Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy$FairComparator" />
<Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
</Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.CompoundComparator" />
<Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
</Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PartitionedQueueComparator" />
<Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />

View File

@ -122,7 +122,11 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
public static final String ORDERING_POLICY = "ordering-policy";
public static final String DEFAULT_ORDERING_POLICY = "fifo";
public static final String FIFO_ORDERING_POLICY = "fifo";
public static final String FAIR_ORDERING_POLICY = "fair";
public static final String DEFAULT_ORDERING_POLICY = FIFO_ORDERING_POLICY;
@Private
public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000;
@ -395,9 +399,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
OrderingPolicy<S> orderingPolicy;
if (policyType.trim().equals("fifo")) {
if (policyType.trim().equals(FIFO_ORDERING_POLICY)) {
policyType = FifoOrderingPolicy.class.getName();
}
if (policyType.trim().equals(FAIR_ORDERING_POLICY)) {
policyType = FairOrderingPolicy.class.getName();
}
try {
orderingPolicy = (OrderingPolicy<S>)
Class.forName(policyType).newInstance();
@ -405,6 +412,15 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
String message = "Unable to construct ordering policy for: " + policyType + ", " + e.getMessage();
throw new RuntimeException(message, e);
}
Map<String, String> config = new HashMap<String, String>();
String confPrefix = getQueuePrefix(queue) + ORDERING_POLICY + ".";
for (Map.Entry<String, String> kv : this) {
if (kv.getKey().startsWith(confPrefix)) {
config.put(kv.getKey().substring(confPrefix.length()), kv.getValue());
}
}
orderingPolicy.configure(config);
return orderingPolicy;
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
//Some policies will use multiple comparators joined together
class CompoundComparator implements Comparator<SchedulableEntity> {
List<Comparator<SchedulableEntity>> comparators;
CompoundComparator(List<Comparator<SchedulableEntity>> comparators) {
this.comparators = comparators;
}
@Override
public int compare(final SchedulableEntity r1, final SchedulableEntity r2) {
for (Comparator<SchedulableEntity> comparator : comparators) {
int result = comparator.compare(r1, r2);
if (result != 0) return result;
}
return 0;
}
}

View File

@ -0,0 +1,114 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.*;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
/**
* An OrderingPolicy which orders SchedulableEntities for fairness (see
* FairScheduler
* FairSharePolicy), generally, processes with lesser usage are lesser. If
* sizedBasedWeight is set to true then an application with high demand
* may be prioritized ahead of an application with less usage. This
* is to offset the tendency to favor small apps, which could result in
* starvation for large apps if many small ones enter and leave the queue
* continuously (optional, default false)
*/
public class FairOrderingPolicy<S extends SchedulableEntity> extends AbstractComparatorOrderingPolicy<S> {
public static final String ENABLE_SIZE_BASED_WEIGHT =
"fair.enable-size-based-weight";
protected class FairComparator implements Comparator<SchedulableEntity> {
@Override
public int compare(final SchedulableEntity r1, final SchedulableEntity r2) {
int res = (int) Math.signum( getMagnitude(r1) - getMagnitude(r2) );
return res;
}
}
private CompoundComparator fairComparator;
private boolean sizeBasedWeight = false;
public FairOrderingPolicy() {
List<Comparator<SchedulableEntity>> comparators =
new ArrayList<Comparator<SchedulableEntity>>();
comparators.add(new FairComparator());
comparators.add(new FifoComparator());
fairComparator = new CompoundComparator(
comparators
);
this.comparator = fairComparator;
this.schedulableEntities = new TreeSet<S>(comparator);
}
private double getMagnitude(SchedulableEntity r) {
double mag = r.getSchedulingResourceUsage().getCachedUsed(
CommonNodeLabelsManager.ANY).getMemory();
if (sizeBasedWeight) {
double weight = Math.log1p(r.getSchedulingResourceUsage().getCachedDemand(
CommonNodeLabelsManager.ANY).getMemory()) / Math.log(2);
mag = mag / weight;
}
return mag;
}
@VisibleForTesting
public boolean getSizeBasedWeight() {
return sizeBasedWeight;
}
@VisibleForTesting
public void setSizeBasedWeight(boolean sizeBasedWeight) {
this.sizeBasedWeight = sizeBasedWeight;
}
@Override
public void configure(Map<String, String> conf) {
if (conf.containsKey(ENABLE_SIZE_BASED_WEIGHT)) {
sizeBasedWeight = Boolean.valueOf(conf.get(ENABLE_SIZE_BASED_WEIGHT));
}
}
@Override
public void containerAllocated(S schedulableEntity,
RMContainer r) {
reorderSchedulableEntity(schedulableEntity);
}
@Override
public void containerReleased(S schedulableEntity,
RMContainer r) {
reorderSchedulableEntity(schedulableEntity);
}
@Override
public String getInfo() {
String sbw = sizeBasedWeight ? " with sizeBasedWeight" : "";
return "FairOrderingPolicy" + sbw;
}
}

View File

@ -84,6 +84,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptA
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -459,6 +460,41 @@ public class TestLeafQueue {
assertEquals(1, userMetrics.getAppsSubmitted());
}
@Test
public void testFairConfiguration() throws Exception {
CapacitySchedulerConfiguration testConf =
new CapacitySchedulerConfiguration();
String tproot = CapacitySchedulerConfiguration.ROOT + "." +
"testPolicyRoot" + System.currentTimeMillis();
OrderingPolicy<FiCaSchedulerApp> schedOrder =
testConf.<FiCaSchedulerApp>getOrderingPolicy(tproot);
//override default to fair
String policyType = CapacitySchedulerConfiguration.PREFIX + tproot +
"." + CapacitySchedulerConfiguration.ORDERING_POLICY;
testConf.set(policyType,
CapacitySchedulerConfiguration.FAIR_ORDERING_POLICY);
schedOrder =
testConf.<FiCaSchedulerApp>getOrderingPolicy(tproot);
FairOrderingPolicy fop = (FairOrderingPolicy<FiCaSchedulerApp>) schedOrder;
assertFalse(fop.getSizeBasedWeight());
//Now with sizeBasedWeight
String sbwConfig = CapacitySchedulerConfiguration.PREFIX + tproot +
"." + CapacitySchedulerConfiguration.ORDERING_POLICY + "." +
FairOrderingPolicy.ENABLE_SIZE_BASED_WEIGHT;
testConf.set(sbwConfig, "true");
schedOrder =
testConf.<FiCaSchedulerApp>getOrderingPolicy(tproot);
fop = (FairOrderingPolicy<FiCaSchedulerApp>) schedOrder;
assertTrue(fop.getSizeBasedWeight());
}
@Test
public void testSingleQueueWithOneUser() throws Exception {
@ -2621,6 +2657,86 @@ public class TestLeafQueue {
}
@Test
public void testFairAssignment() throws Exception {
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
OrderingPolicy<FiCaSchedulerApp> schedulingOrder =
new FairOrderingPolicy<FiCaSchedulerApp>();
a.setOrderingPolicy(schedulingOrder);
String host_0_0 = "127.0.0.1";
String rack_0 = "rack_0";
FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 16*GB);
final int numNodes = 4;
Resource clusterResource = Resources.createResource(
numNodes * (16*GB), numNodes * 16);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
String user_0 = "user_0";
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), spyRMContext));
a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a,
mock(ActiveUsersManager.class), spyRMContext));
a.submitApplicationAttempt(app_1, user_0);
Priority priority = TestUtils.createMockPriority(1);
List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
List<ResourceRequest> app_1_requests_0 = new ArrayList<ResourceRequest>();
app_0_requests_0.clear();
app_0_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1,
true, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
app_1_requests_0.clear();
app_1_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1,
true, priority, recordFactory));
app_1.updateResourceRequests(app_1_requests_0);
a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
Assert.assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
app_0_requests_0.clear();
app_0_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1,
true, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
app_1_requests_0.clear();
app_1_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1,
true, priority, recordFactory));
app_1.updateResourceRequests(app_1_requests_0);
//Since it already has more resources, app_0 will not get
//assigned first, but app_1 will
a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
Assert.assertEquals(2*GB, app_1.getCurrentConsumption().getMemory());
//and only then will app_0
a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
Assert.assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
}
private List<FiCaSchedulerApp> createListOfApps(int noOfApps, String user,
LeafQueue defaultQueue) {
List<FiCaSchedulerApp> appsLists = new ArrayList<FiCaSchedulerApp>();

View File

@ -0,0 +1,149 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.*;
import org.junit.Assert;
import org.junit.Test;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
public class TestFairOrderingPolicy {
final static int GB = 1024;
@Test
public void testSimpleComparison() {
FairOrderingPolicy<MockSchedulableEntity> policy =
new FairOrderingPolicy<MockSchedulableEntity>();
MockSchedulableEntity r1 = new MockSchedulableEntity();
MockSchedulableEntity r2 = new MockSchedulableEntity();
Assert.assertTrue(policy.getComparator().compare(r1, r2) == 0);
//consumption
r1.setUsed(Resources.createResource(1, 0));
AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage(
r1.getSchedulingResourceUsage());
Assert.assertTrue(policy.getComparator().compare(r1, r2) > 0);
}
@Test
public void testSizeBasedWeight() {
FairOrderingPolicy<MockSchedulableEntity> policy =
new FairOrderingPolicy<MockSchedulableEntity>();
policy.setSizeBasedWeight(true);
MockSchedulableEntity r1 = new MockSchedulableEntity();
MockSchedulableEntity r2 = new MockSchedulableEntity();
//No changes, equal
Assert.assertTrue(policy.getComparator().compare(r1, r2) == 0);
r1.setUsed(Resources.createResource(4 * GB));
r2.setUsed(Resources.createResource(4 * GB));
r1.setPending(Resources.createResource(4 * GB));
r2.setPending(Resources.createResource(4 * GB));
AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage(
r1.getSchedulingResourceUsage());
AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage(
r2.getSchedulingResourceUsage());
//Same, equal
Assert.assertTrue(policy.getComparator().compare(r1, r2) == 0);
r2.setUsed(Resources.createResource(5 * GB));
r2.setPending(Resources.createResource(5 * GB));
AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage(
r2.getSchedulingResourceUsage());
//More demand and consumption, but not enough more demand to overcome
//additional consumption
Assert.assertTrue(policy.getComparator().compare(r1, r2) < 0);
//High demand, enough to reverse sbw
r2.setPending(Resources.createResource(100 * GB));
AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage(
r2.getSchedulingResourceUsage());
Assert.assertTrue(policy.getComparator().compare(r1, r2) > 0);
}
@Test
public void testIterators() {
OrderingPolicy<MockSchedulableEntity> schedOrder =
new FairOrderingPolicy<MockSchedulableEntity>();
MockSchedulableEntity msp1 = new MockSchedulableEntity();
MockSchedulableEntity msp2 = new MockSchedulableEntity();
MockSchedulableEntity msp3 = new MockSchedulableEntity();
msp1.setId("1");
msp2.setId("2");
msp3.setId("3");
msp1.setUsed(Resources.createResource(3));
msp2.setUsed(Resources.createResource(2));
msp3.setUsed(Resources.createResource(1));
AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage(
msp1.getSchedulingResourceUsage());
AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage(
msp2.getSchedulingResourceUsage());
AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage(
msp2.getSchedulingResourceUsage());
schedOrder.addSchedulableEntity(msp1);
schedOrder.addSchedulableEntity(msp2);
schedOrder.addSchedulableEntity(msp3);
//Assignment, least to greatest consumption
checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "2", "1"});
//Preemption, greatest to least
checkIds(schedOrder.getPreemptionIterator(), new String[]{"1", "2", "3"});
//Change value without inform, should see no change
msp2.setUsed(Resources.createResource(6));
checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "2", "1"});
checkIds(schedOrder.getPreemptionIterator(), new String[]{"1", "2", "3"});
//Do inform, will reorder
schedOrder.containerAllocated(msp2, null);
checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "1", "2"});
checkIds(schedOrder.getPreemptionIterator(), new String[]{"2", "1", "3"});
}
public void checkIds(Iterator<MockSchedulableEntity> si,
String[] ids) {
for (int i = 0;i < ids.length;i++) {
Assert.assertEquals(si.next().getId(),
ids[i]);
}
}
}