YARN-4617. LeafQueue#pendingOrderingPolicy should always use fixed ordering policy instead of using same as active applications ordering policy. Contributed by Rohith Sharma K S

(cherry picked from commit f4a57d4a53)
This commit is contained in:
Jian He 2016-01-29 12:22:06 -08:00
parent f028e3d7bc
commit e45799a61b
10 changed files with 244 additions and 73 deletions

View File

@ -1183,6 +1183,10 @@ Release 2.8.0 - UNRELEASED
YARN-4643. Container recovery is broken with delegating container runtime
(Sidharta Seethana via jlowe)
YARN-4617. LeafQueue#pendingOrderingPolicy should always use fixed ordering
policy instead of using same as active applications ordering policy.
(Rohith Sharma K S via jianhe)
Release 2.7.3 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -162,6 +162,10 @@
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.PriorityComparator" />
<Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
</Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.RecoveryComparator" />
<Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
</Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PartitionedQueueComparator" />
<Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />

View File

@ -968,7 +968,8 @@ protected void getActivedAppDiagnosticMessage(
// queue's resource usage for specific partition
}
public boolean isAttemptRecovering() {
@Override
public boolean isRecovering() {
return isAttemptRecovering;
}

View File

@ -64,6 +64,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
@ -94,9 +95,6 @@ public class LeafQueue extends AbstractCSQueue {
private Priority defaultAppPriorityPerQueue;
private OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicy = null;
// Always give preference to this while activating the application attempts.
private OrderingPolicy<FiCaSchedulerApp> pendingOPForRecoveredApps = null;
private volatile float minimumAllocationFactor;
@ -124,6 +122,7 @@ public class LeafQueue extends AbstractCSQueue {
private Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityRMContainers =
new HashMap<>();
@SuppressWarnings({ "unchecked", "rawtypes" })
public LeafQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old);
@ -131,6 +130,9 @@ public LeafQueue(CapacitySchedulerContext cs,
this.activeUsersManager = new ActiveUsersManager(metrics);
// One time initialization is enough since it is static ordering policy
this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps();
if(LOG.isDebugEnabled()) {
LOG.debug("LeafQueue:" + " name=" + queueName
+ ", fullname=" + getQueuePath());
@ -157,11 +159,7 @@ protected synchronized void setupQueueConfigs(Resource clusterResource)
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
setOrderingPolicy(conf.<FiCaSchedulerApp>getOrderingPolicy(getQueuePath()));
setPendingAppsOrderingPolicy(conf
.<FiCaSchedulerApp> getOrderingPolicy(getQueuePath()));
setPendingAppsOrderingPolicyRecovery(conf
.<FiCaSchedulerApp> getOrderingPolicy(getQueuePath()));
userLimit = conf.getUserLimit(getQueuePath());
userLimitFactor = conf.getUserLimitFactor(getQueuePath());
@ -325,8 +323,7 @@ public synchronized int getNumApplications() {
}
public synchronized int getNumPendingApplications() {
return pendingOrderingPolicy.getNumSchedulableEntities()
+ pendingOPForRecoveredApps.getNumSchedulableEntities();
return pendingOrderingPolicy.getNumSchedulableEntities();
}
public synchronized int getNumActiveApplications() {
@ -633,18 +630,9 @@ private synchronized void activateApplications() {
calculateAndGetAMResourceLimitPerPartition(nodePartition);
}
activateApplications(getPendingAppsOrderingPolicyRecovery()
.getAssignmentIterator(), userAmPartitionLimit);
activateApplications(
getPendingAppsOrderingPolicy().getAssignmentIterator(),
userAmPartitionLimit);
}
private synchronized void activateApplications(
Iterator<FiCaSchedulerApp> fsApp,
Map<String, Resource> userAmPartitionLimit) {
while (fsApp.hasNext()) {
for (Iterator<FiCaSchedulerApp> fsApp =
getPendingAppsOrderingPolicy().getAssignmentIterator();
fsApp.hasNext();) {
FiCaSchedulerApp application = fsApp.next();
ApplicationId applicationId = application.getApplicationId();
@ -746,11 +734,7 @@ private synchronized void addApplicationAttempt(FiCaSchedulerApp application,
User user) {
// Accept
user.submitApplication();
if (application.isAttemptRecovering()) {
getPendingAppsOrderingPolicyRecovery().addSchedulableEntity(application);
} else {
getPendingAppsOrderingPolicy().addSchedulableEntity(application);
}
getPendingAppsOrderingPolicy().addSchedulableEntity(application);
applicationAttemptMap.put(application.getApplicationAttemptId(), application);
// Activate applications
@ -790,11 +774,7 @@ public synchronized void removeApplicationAttempt(
boolean wasActive =
orderingPolicy.removeSchedulableEntity(application);
if (!wasActive) {
if (application.isAttemptRecovering()) {
pendingOPForRecoveredApps.removeSchedulableEntity(application);
} else {
pendingOrderingPolicy.removeSchedulableEntity(application);
}
pendingOrderingPolicy.removeSchedulableEntity(application);
} else {
queueUsage.decAMUsed(partitionName,
application.getAMResource(partitionName));
@ -1545,18 +1525,16 @@ public void recoverContainer(Resource clusterResource,
* Obtain (read-only) collection of pending applications.
*/
public Collection<FiCaSchedulerApp> getPendingApplications() {
Collection<FiCaSchedulerApp> pendingApps =
new ArrayList<FiCaSchedulerApp>();
pendingApps.addAll(pendingOPForRecoveredApps.getSchedulableEntities());
pendingApps.addAll(pendingOrderingPolicy.getSchedulableEntities());
return pendingApps;
return Collections.unmodifiableCollection(pendingOrderingPolicy
.getSchedulableEntities());
}
/**
* Obtain (read-only) collection of active applications.
*/
public Collection<FiCaSchedulerApp> getApplications() {
return orderingPolicy.getSchedulableEntities();
return Collections.unmodifiableCollection(orderingPolicy
.getSchedulableEntities());
}
// Consider the headroom for each user in the queue.
@ -1593,10 +1571,6 @@ public synchronized Resource getTotalPendingResourcesConsideringUserLimit(
@Override
public synchronized void collectSchedulerApplications(
Collection<ApplicationAttemptId> apps) {
for (FiCaSchedulerApp pendingApp : pendingOPForRecoveredApps
.getSchedulableEntities()) {
apps.add(pendingApp.getApplicationAttemptId());
}
for (FiCaSchedulerApp pendingApp : pendingOrderingPolicy
.getSchedulableEntities()) {
apps.add(pendingApp.getApplicationAttemptId());
@ -1722,30 +1696,6 @@ public void decreaseContainer(Resource clusterResource,
getPendingAppsOrderingPolicy() {
return pendingOrderingPolicy;
}
public synchronized void setPendingAppsOrderingPolicy(
OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicy) {
if (null != this.pendingOrderingPolicy) {
pendingOrderingPolicy
.addAllSchedulableEntities(this.pendingOrderingPolicy
.getSchedulableEntities());
}
this.pendingOrderingPolicy = pendingOrderingPolicy;
}
public synchronized OrderingPolicy<FiCaSchedulerApp>
getPendingAppsOrderingPolicyRecovery() {
return pendingOPForRecoveredApps;
}
public synchronized void setPendingAppsOrderingPolicyRecovery(
OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicyRecovery) {
if (null != this.pendingOPForRecoveredApps) {
pendingOrderingPolicyRecovery
.addAllSchedulableEntities(this.pendingOPForRecoveredApps
.getSchedulableEntities());
}
this.pendingOPForRecoveredApps = pendingOrderingPolicyRecovery;
}
/*
* Holds shared values used by all applications in

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.*;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
/**
* This ordering policy is used for pending applications only.
* An OrderingPolicy which orders SchedulableEntities by
* <ul>
* <li>Recovering application
* <li>Priority of an application
* <li>Input order
* </ul>
* <p>
* Example : If schedulableEntities with E1(true,1,1) E2(true,2,2) E3(true,3,3)
* E4(false,4,4) E5(false,4,5) are added. The ordering policy assignment
* iterator is in the order of E3(true,3,3) E2(true,2,2) E1(true,1,1)
* E5(false,5,5) E4(false,4,4)
*/
public class FifoOrderingPolicyForPendingApps<S extends SchedulableEntity>
extends AbstractComparatorOrderingPolicy<S> {
public FifoOrderingPolicyForPendingApps() {
List<Comparator<SchedulableEntity>> comparators =
new ArrayList<Comparator<SchedulableEntity>>();
comparators.add(new RecoveryComparator());
comparators.add(new PriorityComparator());
comparators.add(new FifoComparator());
this.comparator = new CompoundComparator(comparators);
this.schedulableEntities = new TreeSet<S>(comparator);
}
@Override
public String getInfo() {
return "FifoOrderingPolicyForPendingApps";
}
@Override
public void configure(Map<String, String> conf) {
}
@Override
public void containerAllocated(S schedulableEntity, RMContainer r) {
}
@Override
public void containerReleased(S schedulableEntity, RMContainer r) {
}
@Override
public void demandUpdated(S schedulableEntity) {
}
}

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.Comparator;
/**
* A Comparator which orders SchedulableEntities by isRecovering flag.
*/
public class RecoveryComparator implements Comparator<SchedulableEntity> {
@Override
public int compare(SchedulableEntity se1, SchedulableEntity se2) {
int val1 = se1.isRecovering() ? 1 : 0;
int val2 = se2.isRecovering() ? 1 : 0;
return val2 - val1;
}
}

View File

@ -18,15 +18,12 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.*;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
/**
* A SchedulableEntity is a process to be scheduled,
* A SchedulableEntity is a process to be scheduled.
* for example, an application / application attempt
*/
public interface SchedulableEntity {
@ -53,4 +50,9 @@ public interface SchedulableEntity {
*/
public Priority getPriority();
/**
* Whether application was running before RM restart.
*/
public boolean isRecovering();
}

View File

@ -2399,7 +2399,6 @@ public void testFifoAssignment() throws Exception {
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
a.setOrderingPolicy(new FifoOrderingPolicy<FiCaSchedulerApp>());
a.setPendingAppsOrderingPolicy(new FifoOrderingPolicy<FiCaSchedulerApp>());
String host_0_0 = "127.0.0.1";
String rack_0 = "rack_0";
@ -2549,7 +2548,6 @@ public void testFairAssignment() throws Exception {
new FairOrderingPolicy<FiCaSchedulerApp>();
a.setOrderingPolicy(schedulingOrder);
a.setPendingAppsOrderingPolicy(new FairOrderingPolicy<FiCaSchedulerApp>());
String host_0_0 = "127.0.0.1";
String rack_0 = "rack_0";

View File

@ -32,9 +32,17 @@ public class MockSchedulableEntity implements SchedulableEntity {
private String id;
private long serial = 0;
private Priority priority;
private boolean isRecovering;
public MockSchedulableEntity() { }
public MockSchedulableEntity(long serial, int priority,
boolean isRecovering) {
this.serial = serial;
this.priority = Priority.newInstance(priority);
this.isRecovering = isRecovering;
}
public void setId(String id) {
this.id = id;
}
@ -84,4 +92,13 @@ public Priority getPriority() {
public void setApplicationPriority(Priority priority) {
this.priority = priority;
}
@Override
public boolean isRecovering() {
return isRecovering;
}
protected void setRecovering(boolean entityRecovering) {
this.isRecovering = entityRecovering;
}
}

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.*;
import org.junit.Assert;
import org.junit.Test;
public class TestFifoOrderingPolicyForPendingApps {
@Test
public void testFifoOrderingPolicyForPendingApps() {
FifoOrderingPolicyForPendingApps<MockSchedulableEntity> policy =
new FifoOrderingPolicyForPendingApps<MockSchedulableEntity>();
MockSchedulableEntity r1 = new MockSchedulableEntity();
MockSchedulableEntity r2 = new MockSchedulableEntity();
Assert.assertEquals(policy.getComparator().compare(r1, r2), 0);
r1.setSerial(1);
r1.setRecovering(true);
Assert.assertEquals(policy.getComparator().compare(r1, r2), -1);
r1.setRecovering(false);
r2.setSerial(2);
r2.setRecovering(true);
Assert.assertEquals(policy.getComparator().compare(r1, r2), 1);
}
/**
* Entities submitted with E1-Recovering, E2-Recovering, E3-Recovering, E4-not
* recovering, E5-not recovering.
* Expected Iterator Output : E-3 E-2 E-1 E-5 E-4
*/
@Test
public void testIterators() {
OrderingPolicy<MockSchedulableEntity> schedOrder =
new FifoOrderingPolicyForPendingApps<MockSchedulableEntity>();
MockSchedulableEntity msp1 = new MockSchedulableEntity(1, 1, true);
MockSchedulableEntity msp2 = new MockSchedulableEntity(2, 2, true);
MockSchedulableEntity msp3 = new MockSchedulableEntity(3, 3, true);
MockSchedulableEntity msp4 = new MockSchedulableEntity(4, 2, true);
MockSchedulableEntity msp5 = new MockSchedulableEntity(5, 5, false);
MockSchedulableEntity msp6 = new MockSchedulableEntity(6, 6, false);
MockSchedulableEntity msp7 = new MockSchedulableEntity(7, 5, false);
schedOrder.addSchedulableEntity(msp1);
schedOrder.addSchedulableEntity(msp2);
schedOrder.addSchedulableEntity(msp3);
schedOrder.addSchedulableEntity(msp4);
schedOrder.addSchedulableEntity(msp5);
schedOrder.addSchedulableEntity(msp6);
schedOrder.addSchedulableEntity(msp7);
// Assignment with serial id's are 3,2,4,1,6,5,7
checkSerials(schedOrder.getAssignmentIterator(), new long[] { 3, 2, 4, 1,
6, 5, 7 });
//Preemption, youngest to oldest
checkSerials(schedOrder.getPreemptionIterator(), new long[] { 7, 5, 6, 1,
4, 2, 3 });
}
public void checkSerials(Iterator<MockSchedulableEntity> si,
long[] serials) {
for (int i = 0; i < serials.length; i++) {
Assert.assertEquals(si.next().getSerial(), serials[i]);
}
}
}