YARN-4565. Fix a bug that leads to AM resource limit not hornored when sizeBasedWeight enabled for FairOrderingPolicy. Contributed by Wangda Tan

This commit is contained in:
Jian He 2016-01-18 18:28:05 -08:00
parent ca22066582
commit 914a8fff56
5 changed files with 73 additions and 2 deletions

View File

@ -1152,6 +1152,9 @@ Release 2.8.0 - UNRELEASED
YARN-4502. Fix two AM containers get allocated when AM restart. YARN-4502. Fix two AM containers get allocated when AM restart.
(Vinod Kumar Vavilapalli via wangda) (Vinod Kumar Vavilapalli via wangda)
YARN-4565. Fix a bug that leads to AM resource limit not hornored when
sizeBasedWeight enabled for FairOrderingPolicy. (wtan via jianhe)
Release 2.7.3 - UNRELEASED Release 2.7.3 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -863,7 +863,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
} }
@Override @Override
public synchronized ResourceUsage getSchedulingResourceUsage() { public ResourceUsage getSchedulingResourceUsage() {
return attemptResourceUsage; return attemptResourceUsage;
} }

View File

@ -975,7 +975,8 @@ public class CapacityScheduler extends
clusterResource, getMinimumResourceCapability()); clusterResource, getMinimumResourceCapability());
} }
if (updateDemandForQueue != null) { if (updateDemandForQueue != null && !application
.isWaitingForAMContainer()) {
updateDemandForQueue.getOrderingPolicy().demandUpdated(application); updateDemandForQueue.getOrderingPolicy().demandUpdated(application);
} }

View File

@ -30,6 +30,7 @@ import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.StringTokenizer; import java.util.StringTokenizer;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -959,4 +960,16 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
DEFAULT_CONFIGURATION_APPLICATION_PRIORITY); DEFAULT_CONFIGURATION_APPLICATION_PRIORITY);
return defaultPriority; return defaultPriority;
} }
@VisibleForTesting
public void setOrderingPolicy(String queue, String policy) {
set(getQueuePrefix(queue) + ORDERING_POLICY, policy);
}
@VisibleForTesting
public void setOrderingPolicyParameter(String queue,
String parameterKey, String parameterValue) {
set(getQueuePrefix(queue) + ORDERING_POLICY + "."
+ parameterKey, parameterValue);
}
} }

View File

@ -20,6 +20,18 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.*; import java.util.*;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -138,6 +150,48 @@ public class TestFairOrderingPolicy {
checkIds(schedOrder.getPreemptionIterator(), new String[]{"2", "1", "3"}); checkIds(schedOrder.getPreemptionIterator(), new String[]{"2", "1", "3"});
} }
@Test
public void testSizeBasedWeightNotAffectAppActivation() throws Exception {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
// Define top-level queues
String queuePath = CapacitySchedulerConfiguration.ROOT + ".default";
csConf.setOrderingPolicy(queuePath, CapacitySchedulerConfiguration.FAIR_ORDERING_POLICY);
csConf.setOrderingPolicyParameter(queuePath,
FairOrderingPolicy.ENABLE_SIZE_BASED_WEIGHT, "true");
csConf.setMaximumApplicationMasterResourcePerQueuePercent(queuePath, 0.1f);
// inject node label manager
MockRM rm = new MockRM(csConf);
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
// Get LeafQueue
LeafQueue lq = (LeafQueue) cs.getQueue("default");
OrderingPolicy<FiCaSchedulerApp> policy = lq.getOrderingPolicy();
Assert.assertTrue(policy instanceof FairOrderingPolicy);
Assert.assertTrue(((FairOrderingPolicy<FiCaSchedulerApp>)policy).getSizeBasedWeight());
rm.registerNode("h1:1234", 10 * GB);
// Submit 4 apps
rm.submitApp(1 * GB, "app", "user", null, "default");
rm.submitApp(1 * GB, "app", "user", null, "default");
rm.submitApp(1 * GB, "app", "user", null, "default");
rm.submitApp(1 * GB, "app", "user", null, "default");
Assert.assertEquals(1, lq.getNumActiveApplications());
Assert.assertEquals(3, lq.getNumPendingApplications());
// Try allocate once, #active-apps and #pending-apps should be still correct
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(NodeId.newInstance("h1", 1234))));
Assert.assertEquals(1, lq.getNumActiveApplications());
Assert.assertEquals(3, lq.getNumPendingApplications());
}
public void checkIds(Iterator<MockSchedulableEntity> si, public void checkIds(Iterator<MockSchedulableEntity> si,
String[] ids) { String[] ids) {
for (int i = 0;i < ids.length;i++) { for (int i = 0;i < ids.length;i++) {