YARN-5545, Fix issues related to Max App in capacity scheduler. Contributed by Bibin A Chundatt
This commit is contained in:
parent
b51806b45f
commit
bde95f90e7
|
@ -869,6 +869,13 @@ public class CapacityScheduler extends
|
|||
String queueName, String user, Priority priority) {
|
||||
try {
|
||||
writeLock.lock();
|
||||
if (isSystemAppsLimitReached()) {
|
||||
String message = "Maximum system application limit reached,"
|
||||
+ "cannot accept submission of application: " + applicationId;
|
||||
this.rmContext.getDispatcher().getEventHandler().handle(new RMAppEvent(
|
||||
applicationId, RMAppEventType.APP_REJECTED, message));
|
||||
return;
|
||||
}
|
||||
// Sanity checks.
|
||||
CSQueue queue = getQueue(queueName);
|
||||
if (queue == null) {
|
||||
|
@ -2024,6 +2031,13 @@ public class CapacityScheduler extends
|
|||
return apps;
|
||||
}
|
||||
|
||||
public boolean isSystemAppsLimitReached() {
|
||||
if (root.getNumApplications() < conf.getMaximumSystemApplications()) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private CapacitySchedulerConfiguration loadCapacitySchedulerConfiguration(
|
||||
Configuration configuration) throws IOException {
|
||||
try {
|
||||
|
|
|
@ -1133,4 +1133,19 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|||
INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "max-allowable-limit";
|
||||
public static final float DEFAULT_INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT =
|
||||
0.2f;
|
||||
|
||||
/**
|
||||
* Maximum application for a queue to be used when application per queue is
|
||||
* not defined.To be consistent with previous version the default value is set
|
||||
* as UNDEFINED.
|
||||
*/
|
||||
@Private
|
||||
public static final String QUEUE_GLOBAL_MAX_APPLICATION =
|
||||
PREFIX + "global-queue-max-application";
|
||||
|
||||
public int getGlobalMaximumApplicationsPerQueue() {
|
||||
int maxApplicationsPerQueue =
|
||||
getInt(QUEUE_GLOBAL_MAX_APPLICATION, (int) UNDEFINED);
|
||||
return maxApplicationsPerQueue;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -197,10 +197,15 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
|
||||
maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath());
|
||||
if (maxApplications < 0) {
|
||||
int maxGlobalPerQueueApps = conf.getGlobalMaximumApplicationsPerQueue();
|
||||
if (maxGlobalPerQueueApps > 0) {
|
||||
maxApplications = maxGlobalPerQueueApps;
|
||||
} else {
|
||||
int maxSystemApps = conf.getMaximumSystemApplications();
|
||||
maxApplications =
|
||||
(int) (maxSystemApps * queueCapacities.getAbsoluteCapacity());
|
||||
}
|
||||
}
|
||||
maxApplicationsPerUser = Math.min(maxApplications,
|
||||
(int) (maxApplications * (userLimit / 100.0f) * userLimitFactor));
|
||||
|
||||
|
|
|
@ -31,14 +31,17 @@ import java.util.ArrayList;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
|
@ -47,11 +50,17 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
|
@ -61,12 +70,15 @@ import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
|||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Matchers;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
public class TestApplicationLimits {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestApplicationLimits.class);
|
||||
|
@ -693,9 +705,158 @@ public class TestApplicationLimits {
|
|||
assertEquals(expectedHeadroom, app_1_0.getHeadroom());
|
||||
}
|
||||
|
||||
private Configuration getConfigurationWithQueueLabels(Configuration config) {
|
||||
CapacitySchedulerConfiguration conf =
|
||||
new CapacitySchedulerConfiguration(config);
|
||||
// Define top-level
|
||||
conf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
||||
new String[]{"a", "b", "c", "d"});
|
||||
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
|
||||
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
|
||||
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "z", 100);
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
conf.setInt(CapacitySchedulerConfiguration.QUEUE_GLOBAL_MAX_APPLICATION,
|
||||
20);
|
||||
conf.setInt("yarn.scheduler.capacity.root.a.a1.maximum-applications", 1);
|
||||
conf.setFloat("yarn.scheduler.capacity.root.d.user-limit-factor", 0.1f);
|
||||
conf.setInt("yarn.scheduler.capacity.maximum-applications", 4);
|
||||
final String a = CapacitySchedulerConfiguration.ROOT + ".a";
|
||||
final String b = CapacitySchedulerConfiguration.ROOT + ".b";
|
||||
final String c = CapacitySchedulerConfiguration.ROOT + ".c";
|
||||
final String d = CapacitySchedulerConfiguration.ROOT + ".d";
|
||||
final String aa1 = a + ".a1";
|
||||
final String aa2 = a + ".a2";
|
||||
final String aa3 = a + ".a3";
|
||||
|
||||
conf.setQueues(a, new String[]{"a1", "a2", "a3"});
|
||||
conf.setCapacity(a, 50);
|
||||
conf.setCapacity(b, 50);
|
||||
conf.setCapacity(c, 0);
|
||||
conf.setCapacity(d, 0);
|
||||
conf.setCapacity(aa1, 50);
|
||||
conf.setCapacity(aa2, 50);
|
||||
conf.setCapacity(aa3, 0);
|
||||
|
||||
conf.setCapacityByLabel(a, "y", 25);
|
||||
conf.setCapacityByLabel(b, "y", 50);
|
||||
conf.setCapacityByLabel(c, "y", 25);
|
||||
conf.setCapacityByLabel(d, "y", 0);
|
||||
|
||||
conf.setCapacityByLabel(a, "x", 50);
|
||||
conf.setCapacityByLabel(b, "x", 50);
|
||||
|
||||
conf.setCapacityByLabel(a, "z", 50);
|
||||
conf.setCapacityByLabel(b, "z", 50);
|
||||
|
||||
conf.setCapacityByLabel(aa1, "x", 100);
|
||||
conf.setCapacityByLabel(aa2, "x", 0);
|
||||
|
||||
conf.setCapacityByLabel(aa1, "y", 25);
|
||||
conf.setCapacityByLabel(aa2, "y", 75);
|
||||
|
||||
conf.setCapacityByLabel(aa2, "z", 75);
|
||||
conf.setCapacityByLabel(aa3, "z", 25);
|
||||
return conf;
|
||||
}
|
||||
|
||||
private Set<String> toSet(String... elements) {
|
||||
Set<String> set = Sets.newHashSet(elements);
|
||||
return set;
|
||||
}
|
||||
|
||||
@Test(timeout = 120000)
|
||||
public void testApplicationLimitSubmit() throws Exception {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
|
||||
mgr.init(conf);
|
||||
// set node -> label
|
||||
mgr.addToCluserNodeLabelsWithDefaultExclusivity(
|
||||
ImmutableSet.of("x", "y", "z"));
|
||||
|
||||
// set mapping:
|
||||
// h1 -> x
|
||||
// h2 -> y
|
||||
mgr.addLabelsToNode(
|
||||
ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
|
||||
mgr.addLabelsToNode(
|
||||
ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("y")));
|
||||
|
||||
// inject node label manager
|
||||
MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
|
||||
@Override
|
||||
public RMNodeLabelsManager createNodeLabelManager() {
|
||||
return mgr;
|
||||
}
|
||||
};
|
||||
rm.getRMContext().setNodeLabelManager(mgr);
|
||||
rm.start();
|
||||
MockNM nm1 = rm.registerNode("h1:1234", 4096);
|
||||
MockNM nm2 = rm.registerNode("h2:1234", 4096);
|
||||
MockNM nm3 = rm.registerNode("h3:1234", 4096);
|
||||
|
||||
// Submit application to queue c where the default partition capacity is
|
||||
// zero
|
||||
RMApp app1 = rm.submitApp(GB, "app", "user", null, "c", false);
|
||||
rm.drainEvents();
|
||||
rm.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||
assertEquals(RMAppState.ACCEPTED, app1.getState());
|
||||
rm.killApp(app1.getApplicationId());
|
||||
|
||||
RMApp app2 = rm.submitApp(GB, "app", "user", null, "a1", false);
|
||||
rm.drainEvents();
|
||||
rm.waitForState(app2.getApplicationId(), RMAppState.ACCEPTED);
|
||||
assertEquals(RMAppState.ACCEPTED, app2.getState());
|
||||
|
||||
// Check second application is rejected and based on queue level max
|
||||
// application app is rejected
|
||||
RMApp app3 = rm.submitApp(GB, "app", "user", null, "a1", false);
|
||||
rm.drainEvents();
|
||||
rm.waitForState(app3.getApplicationId(), RMAppState.FAILED);
|
||||
assertEquals(RMAppState.FAILED, app3.getState());
|
||||
assertEquals(
|
||||
"org.apache.hadoop.security.AccessControlException: "
|
||||
+ "Queue root.a.a1 already has 1 applications, cannot accept "
|
||||
+ "submission of application: " + app3.getApplicationId(),
|
||||
app3.getDiagnostics().toString());
|
||||
|
||||
// based on Global limit of queue usert application is rejected
|
||||
RMApp app11 = rm.submitApp(GB, "app", "user", null, "d", false);
|
||||
rm.drainEvents();
|
||||
rm.waitForState(app11.getApplicationId(), RMAppState.ACCEPTED);
|
||||
assertEquals(RMAppState.ACCEPTED, app11.getState());
|
||||
RMApp app12 = rm.submitApp(GB, "app", "user", null, "d", false);
|
||||
rm.drainEvents();
|
||||
rm.waitForState(app12.getApplicationId(), RMAppState.ACCEPTED);
|
||||
assertEquals(RMAppState.ACCEPTED, app12.getState());
|
||||
RMApp app13 = rm.submitApp(GB, "app", "user", null, "d", false);
|
||||
rm.drainEvents();
|
||||
rm.waitForState(app13.getApplicationId(), RMAppState.FAILED);
|
||||
assertEquals(RMAppState.FAILED, app13.getState());
|
||||
assertEquals(
|
||||
"org.apache.hadoop.security.AccessControlException: Queue"
|
||||
+ " root.d already has 2 applications from user user cannot"
|
||||
+ " accept submission of application: " + app13.getApplicationId(),
|
||||
app13.getDiagnostics().toString());
|
||||
|
||||
// based on system max limit application is rejected
|
||||
RMApp app14 = rm.submitApp(GB, "app", "user2", null, "a2", false);
|
||||
rm.drainEvents();
|
||||
rm.waitForState(app14.getApplicationId(), RMAppState.ACCEPTED);
|
||||
RMApp app15 = rm.submitApp(GB, "app", "user2", null, "a2", false);
|
||||
rm.drainEvents();
|
||||
rm.waitForState(app15.getApplicationId(), RMAppState.FAILED);
|
||||
assertEquals(RMAppState.FAILED, app15.getState());
|
||||
assertEquals(
|
||||
"Maximum system application limit reached,cannot"
|
||||
+ " accept submission of application: " + app15.getApplicationId(),
|
||||
app15.getDiagnostics().toString());
|
||||
|
||||
rm.killApp(app2.getApplicationId());
|
||||
rm.killApp(app11.getApplicationId());
|
||||
rm.killApp(app13.getApplicationId());
|
||||
rm.killApp(app14.getApplicationId());
|
||||
rm.stop();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue