YARN-6575. Support global configuration mutation in MutableConfProvider. (Jonathan Hung via Xuan Gong)
This commit is contained in:
parent
a47823ca98
commit
3c50084219
|
@ -21,7 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedConfUpdateInfo;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Interface for determining whether configuration mutations are allowed.
|
* Interface for determining whether configuration mutations are allowed.
|
||||||
|
@ -41,7 +41,7 @@ public interface ConfigurationMutationACLPolicy {
|
||||||
* @param confUpdate configurations to be updated
|
* @param confUpdate configurations to be updated
|
||||||
* @return whether provided mutation is allowed or not
|
* @return whether provided mutation is allowed or not
|
||||||
*/
|
*/
|
||||||
boolean isMutationAllowed(UserGroupInformation user, QueueConfigsUpdateInfo
|
boolean isMutationAllowed(UserGroupInformation user, SchedConfUpdateInfo
|
||||||
confUpdate);
|
confUpdate);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
|
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedConfUpdateInfo;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Default configuration mutation ACL policy. Checks if user is YARN admin.
|
* Default configuration mutation ACL policy. Checks if user is YARN admin.
|
||||||
|
@ -39,7 +39,7 @@ public class DefaultConfigurationMutationACLPolicy implements
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isMutationAllowed(UserGroupInformation user,
|
public boolean isMutationAllowed(UserGroupInformation user,
|
||||||
QueueConfigsUpdateInfo confUpdate) {
|
SchedConfUpdateInfo confUpdate) {
|
||||||
return authorizer.isAdmin(user);
|
return authorizer.isAdmin(user);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedConfUpdateInfo;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
@ -36,7 +36,7 @@ public interface MutableConfScheduler extends ResourceScheduler {
|
||||||
* @throws IOException if update is invalid
|
* @throws IOException if update is invalid
|
||||||
*/
|
*/
|
||||||
void updateConfiguration(UserGroupInformation user,
|
void updateConfiguration(UserGroupInformation user,
|
||||||
QueueConfigsUpdateInfo confUpdate) throws IOException;
|
SchedConfUpdateInfo confUpdate) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the scheduler configuration.
|
* Get the scheduler configuration.
|
||||||
|
|
|
@ -19,7 +19,7 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||||
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedConfUpdateInfo;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
@ -34,7 +34,7 @@ public interface MutableConfigurationProvider {
|
||||||
* @param confUpdate Key-value pairs for configurations to be updated.
|
* @param confUpdate Key-value pairs for configurations to be updated.
|
||||||
* @throws IOException if scheduler could not be reinitialized
|
* @throws IOException if scheduler could not be reinitialized
|
||||||
*/
|
*/
|
||||||
void mutateConfiguration(UserGroupInformation user, QueueConfigsUpdateInfo
|
void mutateConfiguration(UserGroupInformation user, SchedConfUpdateInfo
|
||||||
confUpdate) throws IOException;
|
confUpdate) throws IOException;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -142,7 +142,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.Placeme
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedConfUpdateInfo;
|
||||||
import org.apache.hadoop.yarn.server.utils.Lock;
|
import org.apache.hadoop.yarn.server.utils.Lock;
|
||||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
|
@ -2618,7 +2618,7 @@ public class CapacityScheduler extends
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void updateConfiguration(UserGroupInformation user,
|
public void updateConfiguration(UserGroupInformation user,
|
||||||
QueueConfigsUpdateInfo confUpdate) throws IOException {
|
SchedConfUpdateInfo confUpdate) throws IOException {
|
||||||
if (csConfProvider instanceof MutableConfigurationProvider) {
|
if (csConfProvider instanceof MutableConfigurationProvider) {
|
||||||
((MutableConfigurationProvider) csConfProvider).mutateConfiguration(
|
((MutableConfigurationProvider) csConfProvider).mutateConfiguration(
|
||||||
user, confUpdate);
|
user, confUpdate);
|
||||||
|
|
|
@ -32,7 +32,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedConfUpdateInfo;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -98,7 +98,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void mutateConfiguration(UserGroupInformation user,
|
public void mutateConfiguration(UserGroupInformation user,
|
||||||
QueueConfigsUpdateInfo confUpdate) throws IOException {
|
SchedConfUpdateInfo confUpdate) throws IOException {
|
||||||
if (!aclMutationPolicy.isMutationAllowed(user, confUpdate)) {
|
if (!aclMutationPolicy.isMutationAllowed(user, confUpdate)) {
|
||||||
throw new AccessControlException("User is not admin of all modified" +
|
throw new AccessControlException("User is not admin of all modified" +
|
||||||
" queues.");
|
" queues.");
|
||||||
|
@ -126,7 +126,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
|
||||||
|
|
||||||
|
|
||||||
private Map<String, String> constructKeyValueConfUpdate(
|
private Map<String, String> constructKeyValueConfUpdate(
|
||||||
QueueConfigsUpdateInfo mutationInfo) throws IOException {
|
SchedConfUpdateInfo mutationInfo) throws IOException {
|
||||||
CapacityScheduler cs = (CapacityScheduler) rmContext.getScheduler();
|
CapacityScheduler cs = (CapacityScheduler) rmContext.getScheduler();
|
||||||
CapacitySchedulerConfiguration proposedConf =
|
CapacitySchedulerConfiguration proposedConf =
|
||||||
new CapacitySchedulerConfiguration(cs.getConfiguration(), false);
|
new CapacitySchedulerConfiguration(cs.getConfiguration(), false);
|
||||||
|
@ -140,6 +140,10 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
|
||||||
for (QueueConfigInfo updateQueueInfo : mutationInfo.getUpdateQueueInfo()) {
|
for (QueueConfigInfo updateQueueInfo : mutationInfo.getUpdateQueueInfo()) {
|
||||||
updateQueue(updateQueueInfo, proposedConf, confUpdate);
|
updateQueue(updateQueueInfo, proposedConf, confUpdate);
|
||||||
}
|
}
|
||||||
|
for (Map.Entry<String, String> global : mutationInfo.getGlobalParams()
|
||||||
|
.entrySet()) {
|
||||||
|
confUpdate.put(global.getKey(), global.getValue());
|
||||||
|
}
|
||||||
return confUpdate;
|
return confUpdate;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,15 +22,17 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||||
|
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMutationACLPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMutationACLPolicy;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedConfUpdateInfo;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -40,16 +42,29 @@ import java.util.Set;
|
||||||
public class QueueAdminConfigurationMutationACLPolicy implements
|
public class QueueAdminConfigurationMutationACLPolicy implements
|
||||||
ConfigurationMutationACLPolicy {
|
ConfigurationMutationACLPolicy {
|
||||||
|
|
||||||
|
private Configuration conf;
|
||||||
private RMContext rmContext;
|
private RMContext rmContext;
|
||||||
|
private YarnAuthorizationProvider authorizer;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(Configuration conf, RMContext context) {
|
public void init(Configuration config, RMContext context) {
|
||||||
|
this.conf = config;
|
||||||
this.rmContext = context;
|
this.rmContext = context;
|
||||||
|
this.authorizer = YarnAuthorizationProvider.getInstance(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isMutationAllowed(UserGroupInformation user,
|
public boolean isMutationAllowed(UserGroupInformation user,
|
||||||
QueueConfigsUpdateInfo confUpdate) {
|
SchedConfUpdateInfo confUpdate) {
|
||||||
|
// If there are global config changes, check if user is admin.
|
||||||
|
Map<String, String> globalParams = confUpdate.getGlobalParams();
|
||||||
|
if (globalParams != null && globalParams.size() != 0) {
|
||||||
|
if (!authorizer.isAdmin(user)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if user is admin of all modified queues.
|
||||||
Set<String> queues = new HashSet<>();
|
Set<String> queues = new HashSet<>();
|
||||||
for (QueueConfigInfo addQueueInfo : confUpdate.getAddQueueInfo()) {
|
for (QueueConfigInfo addQueueInfo : confUpdate.getAddQueueInfo()) {
|
||||||
queues.add(addQueueInfo.getQueue());
|
queues.add(addQueueInfo.getQueue());
|
||||||
|
@ -71,7 +86,6 @@ public class QueueAdminConfigurationMutationACLPolicy implements
|
||||||
// Queue is not found, do nothing.
|
// Queue is not found, do nothing.
|
||||||
}
|
}
|
||||||
String parentPath = queuePath;
|
String parentPath = queuePath;
|
||||||
// TODO: handle global config change.
|
|
||||||
while (queueInfo == null) {
|
while (queueInfo == null) {
|
||||||
// We are adding a queue (whose parent we are possibly also adding).
|
// We are adding a queue (whose parent we are possibly also adding).
|
||||||
// Check ACL of lowest parent queue which already exists.
|
// Check ACL of lowest parent queue which already exists.
|
||||||
|
|
|
@ -2415,10 +2415,10 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
|
||||||
}
|
}
|
||||||
|
|
||||||
@PUT
|
@PUT
|
||||||
@Path("/queues")
|
@Path("/sched-conf")
|
||||||
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
|
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
|
||||||
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
|
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
|
||||||
public synchronized Response updateSchedulerConfiguration(final QueueConfigsUpdateInfo
|
public synchronized Response updateSchedulerConfiguration(final SchedConfUpdateInfo
|
||||||
mutationInfo, @Context HttpServletRequest hsr)
|
mutationInfo, @Context HttpServletRequest hsr)
|
||||||
throws AuthorizationException, InterruptedException {
|
throws AuthorizationException, InterruptedException {
|
||||||
init();
|
init();
|
||||||
|
|
|
@ -19,30 +19,34 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
|
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
|
||||||
import javax.xml.bind.annotation.XmlAccessType;
|
import javax.xml.bind.annotation.XmlAccessType;
|
||||||
import javax.xml.bind.annotation.XmlAccessorType;
|
import javax.xml.bind.annotation.XmlAccessorType;
|
||||||
import javax.xml.bind.annotation.XmlElement;
|
import javax.xml.bind.annotation.XmlElement;
|
||||||
|
import javax.xml.bind.annotation.XmlElementWrapper;
|
||||||
import javax.xml.bind.annotation.XmlRootElement;
|
import javax.xml.bind.annotation.XmlRootElement;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Information for making scheduler configuration changes (supports adding,
|
* Information for making scheduler configuration changes (supports adding,
|
||||||
* removing, or updating a queue).
|
* removing, or updating a queue, as well as global scheduler conf changes).
|
||||||
*/
|
*/
|
||||||
@XmlRootElement(name = "schedConf")
|
@XmlRootElement(name = "schedConf")
|
||||||
@XmlAccessorType(XmlAccessType.FIELD)
|
@XmlAccessorType(XmlAccessType.FIELD)
|
||||||
public class QueueConfigsUpdateInfo {
|
public class SchedConfUpdateInfo {
|
||||||
|
|
||||||
@XmlElement(name = "add")
|
@XmlElement(name = "add-queue")
|
||||||
private ArrayList<QueueConfigInfo> addQueueInfo = new ArrayList<>();
|
private ArrayList<QueueConfigInfo> addQueueInfo = new ArrayList<>();
|
||||||
|
|
||||||
@XmlElement(name = "remove")
|
@XmlElement(name = "remove-queue")
|
||||||
private ArrayList<String> removeQueueInfo = new ArrayList<>();
|
private ArrayList<String> removeQueueInfo = new ArrayList<>();
|
||||||
|
|
||||||
@XmlElement(name = "update")
|
@XmlElement(name = "update-queue")
|
||||||
private ArrayList<QueueConfigInfo> updateQueueInfo = new ArrayList<>();
|
private ArrayList<QueueConfigInfo> updateQueueInfo = new ArrayList<>();
|
||||||
|
|
||||||
public QueueConfigsUpdateInfo() {
|
private HashMap<String, String> global = new HashMap<>();
|
||||||
|
|
||||||
|
public SchedConfUpdateInfo() {
|
||||||
// JAXB needs this
|
// JAXB needs this
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -57,4 +61,9 @@ public class QueueConfigsUpdateInfo {
|
||||||
public ArrayList<QueueConfigInfo> getUpdateQueueInfo() {
|
public ArrayList<QueueConfigInfo> getUpdateQueueInfo() {
|
||||||
return updateQueueInfo;
|
return updateQueueInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@XmlElementWrapper(name = "global-updates")
|
||||||
|
public HashMap<String, String> getGlobalParams() {
|
||||||
|
return global;
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -26,7 +26,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.QueueAdminConfigurationMutationACLPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.QueueAdminConfigurationMutationACLPolicy;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedConfUpdateInfo;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -77,6 +77,7 @@ public class TestConfigurationMutationACLPolicies {
|
||||||
.thenReturn(false);
|
.thenReturn(false);
|
||||||
when(scheduler.getQueue(eq(queueName))).thenReturn(queue);
|
when(scheduler.getQueue(eq(queueName))).thenReturn(queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDefaultPolicy() {
|
public void testDefaultPolicy() {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
|
@ -98,7 +99,7 @@ public class TestConfigurationMutationACLPolicies {
|
||||||
ConfigurationMutationACLPolicy.class);
|
ConfigurationMutationACLPolicy.class);
|
||||||
policy = ConfigurationMutationACLPolicyFactory.getPolicy(conf);
|
policy = ConfigurationMutationACLPolicyFactory.getPolicy(conf);
|
||||||
policy.init(conf, rmContext);
|
policy.init(conf, rmContext);
|
||||||
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
|
SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
|
||||||
QueueConfigInfo configInfo = new QueueConfigInfo("root.a", EMPTY_MAP);
|
QueueConfigInfo configInfo = new QueueConfigInfo("root.a", EMPTY_MAP);
|
||||||
updateInfo.getUpdateQueueInfo().add(configInfo);
|
updateInfo.getUpdateQueueInfo().add(configInfo);
|
||||||
assertTrue(policy.isMutationAllowed(GOOD_USER, updateInfo));
|
assertTrue(policy.isMutationAllowed(GOOD_USER, updateInfo));
|
||||||
|
@ -114,7 +115,7 @@ public class TestConfigurationMutationACLPolicies {
|
||||||
policy = ConfigurationMutationACLPolicyFactory.getPolicy(conf);
|
policy = ConfigurationMutationACLPolicyFactory.getPolicy(conf);
|
||||||
policy.init(conf, rmContext);
|
policy.init(conf, rmContext);
|
||||||
// Add root.b.b1. Should check ACL of root.b queue.
|
// Add root.b.b1. Should check ACL of root.b queue.
|
||||||
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
|
SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
|
||||||
QueueConfigInfo configInfo = new QueueConfigInfo("root.b.b2", EMPTY_MAP);
|
QueueConfigInfo configInfo = new QueueConfigInfo("root.b.b2", EMPTY_MAP);
|
||||||
updateInfo.getAddQueueInfo().add(configInfo);
|
updateInfo.getAddQueueInfo().add(configInfo);
|
||||||
assertTrue(policy.isMutationAllowed(GOOD_USER, updateInfo));
|
assertTrue(policy.isMutationAllowed(GOOD_USER, updateInfo));
|
||||||
|
@ -130,7 +131,7 @@ public class TestConfigurationMutationACLPolicies {
|
||||||
policy = ConfigurationMutationACLPolicyFactory.getPolicy(conf);
|
policy = ConfigurationMutationACLPolicyFactory.getPolicy(conf);
|
||||||
policy.init(conf, rmContext);
|
policy.init(conf, rmContext);
|
||||||
// Add root.b.b1.b11. Should check ACL of root.b queue.
|
// Add root.b.b1.b11. Should check ACL of root.b queue.
|
||||||
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
|
SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
|
||||||
QueueConfigInfo configInfo = new QueueConfigInfo("root.b.b2.b21", EMPTY_MAP);
|
QueueConfigInfo configInfo = new QueueConfigInfo("root.b.b2.b21", EMPTY_MAP);
|
||||||
updateInfo.getAddQueueInfo().add(configInfo);
|
updateInfo.getAddQueueInfo().add(configInfo);
|
||||||
assertTrue(policy.isMutationAllowed(GOOD_USER, updateInfo));
|
assertTrue(policy.isMutationAllowed(GOOD_USER, updateInfo));
|
||||||
|
@ -146,9 +147,26 @@ public class TestConfigurationMutationACLPolicies {
|
||||||
policy = ConfigurationMutationACLPolicyFactory.getPolicy(conf);
|
policy = ConfigurationMutationACLPolicyFactory.getPolicy(conf);
|
||||||
policy.init(conf, rmContext);
|
policy.init(conf, rmContext);
|
||||||
// Remove root.b.b1.
|
// Remove root.b.b1.
|
||||||
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
|
SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
|
||||||
updateInfo.getRemoveQueueInfo().add("root.b.b1");
|
updateInfo.getRemoveQueueInfo().add("root.b.b1");
|
||||||
assertTrue(policy.isMutationAllowed(GOOD_USER, updateInfo));
|
assertTrue(policy.isMutationAllowed(GOOD_USER, updateInfo));
|
||||||
assertFalse(policy.isMutationAllowed(BAD_USER, updateInfo));
|
assertFalse(policy.isMutationAllowed(BAD_USER, updateInfo));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testQueueAdminPolicyGlobal() {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(YarnConfiguration.YARN_ADMIN_ACL, GOOD_USER.getShortUserName());
|
||||||
|
conf.setClass(YarnConfiguration.RM_SCHEDULER_MUTATION_ACL_POLICY_CLASS,
|
||||||
|
QueueAdminConfigurationMutationACLPolicy.class,
|
||||||
|
ConfigurationMutationACLPolicy.class);
|
||||||
|
policy = ConfigurationMutationACLPolicyFactory.getPolicy(conf);
|
||||||
|
policy.init(conf, rmContext);
|
||||||
|
SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
|
||||||
|
assertTrue(policy.isMutationAllowed(GOOD_USER, updateInfo));
|
||||||
|
assertTrue(policy.isMutationAllowed(BAD_USER, updateInfo));
|
||||||
|
updateInfo.getGlobalParams().put("globalKey", "globalValue");
|
||||||
|
assertTrue(policy.isMutationAllowed(GOOD_USER, updateInfo));
|
||||||
|
assertFalse(policy.isMutationAllowed(BAD_USER, updateInfo));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,7 +24,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedConfUpdateInfo;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -47,8 +47,8 @@ public class TestMutableCSConfigurationProvider {
|
||||||
|
|
||||||
private MutableCSConfigurationProvider confProvider;
|
private MutableCSConfigurationProvider confProvider;
|
||||||
private RMContext rmContext;
|
private RMContext rmContext;
|
||||||
private QueueConfigsUpdateInfo goodUpdate;
|
private SchedConfUpdateInfo goodUpdate;
|
||||||
private QueueConfigsUpdateInfo badUpdate;
|
private SchedConfUpdateInfo badUpdate;
|
||||||
private CapacityScheduler cs;
|
private CapacityScheduler cs;
|
||||||
|
|
||||||
private static final UserGroupInformation TEST_USER = UserGroupInformation
|
private static final UserGroupInformation TEST_USER = UserGroupInformation
|
||||||
|
@ -62,14 +62,14 @@ public class TestMutableCSConfigurationProvider {
|
||||||
when(cs.getConfiguration()).thenReturn(
|
when(cs.getConfiguration()).thenReturn(
|
||||||
new CapacitySchedulerConfiguration());
|
new CapacitySchedulerConfiguration());
|
||||||
confProvider = new MutableCSConfigurationProvider(rmContext);
|
confProvider = new MutableCSConfigurationProvider(rmContext);
|
||||||
goodUpdate = new QueueConfigsUpdateInfo();
|
goodUpdate = new SchedConfUpdateInfo();
|
||||||
Map<String, String> goodUpdateMap = new HashMap<>();
|
Map<String, String> goodUpdateMap = new HashMap<>();
|
||||||
goodUpdateMap.put("goodKey", "goodVal");
|
goodUpdateMap.put("goodKey", "goodVal");
|
||||||
QueueConfigInfo goodUpdateInfo = new
|
QueueConfigInfo goodUpdateInfo = new
|
||||||
QueueConfigInfo("root.a", goodUpdateMap);
|
QueueConfigInfo("root.a", goodUpdateMap);
|
||||||
goodUpdate.getUpdateQueueInfo().add(goodUpdateInfo);
|
goodUpdate.getUpdateQueueInfo().add(goodUpdateInfo);
|
||||||
|
|
||||||
badUpdate = new QueueConfigsUpdateInfo();
|
badUpdate = new SchedConfUpdateInfo();
|
||||||
Map<String, String> badUpdateMap = new HashMap<>();
|
Map<String, String> badUpdateMap = new HashMap<>();
|
||||||
badUpdateMap.put("badKey", "badVal");
|
badUpdateMap.put("badKey", "badVal");
|
||||||
QueueConfigInfo badUpdateInfo = new
|
QueueConfigInfo badUpdateInfo = new
|
||||||
|
|
|
@ -38,7 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedConfUpdateInfo;
|
||||||
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
|
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
|
||||||
import org.apache.hadoop.yarn.webapp.JerseyTestBase;
|
import org.apache.hadoop.yarn.webapp.JerseyTestBase;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -168,7 +168,7 @@ public class TestRMWebServicesConfigurationMutation extends JerseyTestBase {
|
||||||
ClientResponse response;
|
ClientResponse response;
|
||||||
|
|
||||||
// Add parent queue root.d with two children d1 and d2.
|
// Add parent queue root.d with two children d1 and d2.
|
||||||
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
|
SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
|
||||||
Map<String, String> d1Capacity = new HashMap<>();
|
Map<String, String> d1Capacity = new HashMap<>();
|
||||||
d1Capacity.put(CapacitySchedulerConfiguration.CAPACITY, "25");
|
d1Capacity.put(CapacitySchedulerConfiguration.CAPACITY, "25");
|
||||||
d1Capacity.put(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY, "25");
|
d1Capacity.put(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY, "25");
|
||||||
|
@ -187,9 +187,9 @@ public class TestRMWebServicesConfigurationMutation extends JerseyTestBase {
|
||||||
updateInfo.getAddQueueInfo().add(d);
|
updateInfo.getAddQueueInfo().add(d);
|
||||||
response =
|
response =
|
||||||
r.path("ws").path("v1").path("cluster")
|
r.path("ws").path("v1").path("cluster")
|
||||||
.path("queues").queryParam("user.name", userName)
|
.path("sched-conf").queryParam("user.name", userName)
|
||||||
.accept(MediaType.APPLICATION_JSON)
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
|
.entity(toJson(updateInfo, SchedConfUpdateInfo.class),
|
||||||
MediaType.APPLICATION_JSON)
|
MediaType.APPLICATION_JSON)
|
||||||
.put(ClientResponse.class);
|
.put(ClientResponse.class);
|
||||||
|
|
||||||
|
@ -211,7 +211,7 @@ public class TestRMWebServicesConfigurationMutation extends JerseyTestBase {
|
||||||
ClientResponse response;
|
ClientResponse response;
|
||||||
|
|
||||||
// Add root.d with capacity 25, reducing root.b capacity from 75 to 50.
|
// Add root.d with capacity 25, reducing root.b capacity from 75 to 50.
|
||||||
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
|
SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
|
||||||
Map<String, String> dCapacity = new HashMap<>();
|
Map<String, String> dCapacity = new HashMap<>();
|
||||||
dCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "25");
|
dCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "25");
|
||||||
Map<String, String> bCapacity = new HashMap<>();
|
Map<String, String> bCapacity = new HashMap<>();
|
||||||
|
@ -222,9 +222,9 @@ public class TestRMWebServicesConfigurationMutation extends JerseyTestBase {
|
||||||
updateInfo.getUpdateQueueInfo().add(b);
|
updateInfo.getUpdateQueueInfo().add(b);
|
||||||
response =
|
response =
|
||||||
r.path("ws").path("v1").path("cluster")
|
r.path("ws").path("v1").path("cluster")
|
||||||
.path("queues").queryParam("user.name", userName)
|
.path("sched-conf").queryParam("user.name", userName)
|
||||||
.accept(MediaType.APPLICATION_JSON)
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
|
.entity(toJson(updateInfo, SchedConfUpdateInfo.class),
|
||||||
MediaType.APPLICATION_JSON)
|
MediaType.APPLICATION_JSON)
|
||||||
.put(ClientResponse.class);
|
.put(ClientResponse.class);
|
||||||
|
|
||||||
|
@ -244,13 +244,13 @@ public class TestRMWebServicesConfigurationMutation extends JerseyTestBase {
|
||||||
|
|
||||||
stopQueue("root.a.a2");
|
stopQueue("root.a.a2");
|
||||||
// Remove root.a.a2
|
// Remove root.a.a2
|
||||||
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
|
SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
|
||||||
updateInfo.getRemoveQueueInfo().add("root.a.a2");
|
updateInfo.getRemoveQueueInfo().add("root.a.a2");
|
||||||
response =
|
response =
|
||||||
r.path("ws").path("v1").path("cluster")
|
r.path("ws").path("v1").path("cluster")
|
||||||
.path("queues").queryParam("user.name", userName)
|
.path("sched-conf").queryParam("user.name", userName)
|
||||||
.accept(MediaType.APPLICATION_JSON)
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
|
.entity(toJson(updateInfo, SchedConfUpdateInfo.class),
|
||||||
MediaType.APPLICATION_JSON)
|
MediaType.APPLICATION_JSON)
|
||||||
.put(ClientResponse.class);
|
.put(ClientResponse.class);
|
||||||
|
|
||||||
|
@ -269,13 +269,13 @@ public class TestRMWebServicesConfigurationMutation extends JerseyTestBase {
|
||||||
|
|
||||||
stopQueue("root.c", "root.c.c1");
|
stopQueue("root.c", "root.c.c1");
|
||||||
// Remove root.c (parent queue)
|
// Remove root.c (parent queue)
|
||||||
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
|
SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
|
||||||
updateInfo.getRemoveQueueInfo().add("root.c");
|
updateInfo.getRemoveQueueInfo().add("root.c");
|
||||||
response =
|
response =
|
||||||
r.path("ws").path("v1").path("cluster")
|
r.path("ws").path("v1").path("cluster")
|
||||||
.path("queues").queryParam("user.name", userName)
|
.path("sched-conf").queryParam("user.name", userName)
|
||||||
.accept(MediaType.APPLICATION_JSON)
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
|
.entity(toJson(updateInfo, SchedConfUpdateInfo.class),
|
||||||
MediaType.APPLICATION_JSON)
|
MediaType.APPLICATION_JSON)
|
||||||
.put(ClientResponse.class);
|
.put(ClientResponse.class);
|
||||||
|
|
||||||
|
@ -294,7 +294,7 @@ public class TestRMWebServicesConfigurationMutation extends JerseyTestBase {
|
||||||
|
|
||||||
stopQueue("root.a", "root.a.a1", "root.a.a2");
|
stopQueue("root.a", "root.a.a1", "root.a.a2");
|
||||||
// Remove root.a (parent queue) with capacity 25
|
// Remove root.a (parent queue) with capacity 25
|
||||||
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
|
SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
|
||||||
updateInfo.getRemoveQueueInfo().add("root.a");
|
updateInfo.getRemoveQueueInfo().add("root.a");
|
||||||
|
|
||||||
// Set root.b capacity to 100
|
// Set root.b capacity to 100
|
||||||
|
@ -304,9 +304,9 @@ public class TestRMWebServicesConfigurationMutation extends JerseyTestBase {
|
||||||
updateInfo.getUpdateQueueInfo().add(b);
|
updateInfo.getUpdateQueueInfo().add(b);
|
||||||
response =
|
response =
|
||||||
r.path("ws").path("v1").path("cluster")
|
r.path("ws").path("v1").path("cluster")
|
||||||
.path("queues").queryParam("user.name", userName)
|
.path("sched-conf").queryParam("user.name", userName)
|
||||||
.accept(MediaType.APPLICATION_JSON)
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
|
.entity(toJson(updateInfo, SchedConfUpdateInfo.class),
|
||||||
MediaType.APPLICATION_JSON)
|
MediaType.APPLICATION_JSON)
|
||||||
.put(ClientResponse.class);
|
.put(ClientResponse.class);
|
||||||
|
|
||||||
|
@ -326,7 +326,7 @@ public class TestRMWebServicesConfigurationMutation extends JerseyTestBase {
|
||||||
|
|
||||||
stopQueue("root.b", "root.c", "root.c.c1");
|
stopQueue("root.b", "root.c", "root.c.c1");
|
||||||
// Remove root.b and root.c
|
// Remove root.b and root.c
|
||||||
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
|
SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
|
||||||
updateInfo.getRemoveQueueInfo().add("root.b");
|
updateInfo.getRemoveQueueInfo().add("root.b");
|
||||||
updateInfo.getRemoveQueueInfo().add("root.c");
|
updateInfo.getRemoveQueueInfo().add("root.c");
|
||||||
Map<String, String> aCapacity = new HashMap<>();
|
Map<String, String> aCapacity = new HashMap<>();
|
||||||
|
@ -336,9 +336,9 @@ public class TestRMWebServicesConfigurationMutation extends JerseyTestBase {
|
||||||
updateInfo.getUpdateQueueInfo().add(configInfo);
|
updateInfo.getUpdateQueueInfo().add(configInfo);
|
||||||
response =
|
response =
|
||||||
r.path("ws").path("v1").path("cluster")
|
r.path("ws").path("v1").path("cluster")
|
||||||
.path("queues").queryParam("user.name", userName)
|
.path("sched-conf").queryParam("user.name", userName)
|
||||||
.accept(MediaType.APPLICATION_JSON)
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
|
.entity(toJson(updateInfo, SchedConfUpdateInfo.class),
|
||||||
MediaType.APPLICATION_JSON)
|
MediaType.APPLICATION_JSON)
|
||||||
.put(ClientResponse.class);
|
.put(ClientResponse.class);
|
||||||
|
|
||||||
|
@ -354,7 +354,7 @@ public class TestRMWebServicesConfigurationMutation extends JerseyTestBase {
|
||||||
ClientResponse response;
|
ClientResponse response;
|
||||||
|
|
||||||
// Set state of queues to STOPPED.
|
// Set state of queues to STOPPED.
|
||||||
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
|
SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
|
||||||
Map<String, String> stoppedParam = new HashMap<>();
|
Map<String, String> stoppedParam = new HashMap<>();
|
||||||
stoppedParam.put(CapacitySchedulerConfiguration.STATE,
|
stoppedParam.put(CapacitySchedulerConfiguration.STATE,
|
||||||
QueueState.STOPPED.toString());
|
QueueState.STOPPED.toString());
|
||||||
|
@ -364,9 +364,9 @@ public class TestRMWebServicesConfigurationMutation extends JerseyTestBase {
|
||||||
}
|
}
|
||||||
response =
|
response =
|
||||||
r.path("ws").path("v1").path("cluster")
|
r.path("ws").path("v1").path("cluster")
|
||||||
.path("queues").queryParam("user.name", userName)
|
.path("sched-conf").queryParam("user.name", userName)
|
||||||
.accept(MediaType.APPLICATION_JSON)
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
|
.entity(toJson(updateInfo, SchedConfUpdateInfo.class),
|
||||||
MediaType.APPLICATION_JSON)
|
MediaType.APPLICATION_JSON)
|
||||||
.put(ClientResponse.class);
|
.put(ClientResponse.class);
|
||||||
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||||
|
@ -384,7 +384,7 @@ public class TestRMWebServicesConfigurationMutation extends JerseyTestBase {
|
||||||
ClientResponse response;
|
ClientResponse response;
|
||||||
|
|
||||||
// Update config value.
|
// Update config value.
|
||||||
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
|
SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
|
||||||
Map<String, String> updateParam = new HashMap<>();
|
Map<String, String> updateParam = new HashMap<>();
|
||||||
updateParam.put(CapacitySchedulerConfiguration.MAXIMUM_AM_RESOURCE_SUFFIX,
|
updateParam.put(CapacitySchedulerConfiguration.MAXIMUM_AM_RESOURCE_SUFFIX,
|
||||||
"0.2");
|
"0.2");
|
||||||
|
@ -399,9 +399,9 @@ public class TestRMWebServicesConfigurationMutation extends JerseyTestBase {
|
||||||
0.001f);
|
0.001f);
|
||||||
response =
|
response =
|
||||||
r.path("ws").path("v1").path("cluster")
|
r.path("ws").path("v1").path("cluster")
|
||||||
.path("queues").queryParam("user.name", userName)
|
.path("sched-conf").queryParam("user.name", userName)
|
||||||
.accept(MediaType.APPLICATION_JSON)
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
|
.entity(toJson(updateInfo, SchedConfUpdateInfo.class),
|
||||||
MediaType.APPLICATION_JSON)
|
MediaType.APPLICATION_JSON)
|
||||||
.put(ClientResponse.class);
|
.put(ClientResponse.class);
|
||||||
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||||
|
@ -417,9 +417,9 @@ public class TestRMWebServicesConfigurationMutation extends JerseyTestBase {
|
||||||
updateInfo.getUpdateQueueInfo().add(aUpdateInfo);
|
updateInfo.getUpdateQueueInfo().add(aUpdateInfo);
|
||||||
response =
|
response =
|
||||||
r.path("ws").path("v1").path("cluster")
|
r.path("ws").path("v1").path("cluster")
|
||||||
.path("queues").queryParam("user.name", userName)
|
.path("sched-conf").queryParam("user.name", userName)
|
||||||
.accept(MediaType.APPLICATION_JSON)
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
|
.entity(toJson(updateInfo, SchedConfUpdateInfo.class),
|
||||||
MediaType.APPLICATION_JSON)
|
MediaType.APPLICATION_JSON)
|
||||||
.put(ClientResponse.class);
|
.put(ClientResponse.class);
|
||||||
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||||
|
@ -437,7 +437,7 @@ public class TestRMWebServicesConfigurationMutation extends JerseyTestBase {
|
||||||
ClientResponse response;
|
ClientResponse response;
|
||||||
|
|
||||||
// Update root.a and root.b capacity to 50.
|
// Update root.a and root.b capacity to 50.
|
||||||
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
|
SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
|
||||||
Map<String, String> updateParam = new HashMap<>();
|
Map<String, String> updateParam = new HashMap<>();
|
||||||
updateParam.put(CapacitySchedulerConfiguration.CAPACITY, "50");
|
updateParam.put(CapacitySchedulerConfiguration.CAPACITY, "50");
|
||||||
QueueConfigInfo aUpdateInfo = new QueueConfigInfo("root.a", updateParam);
|
QueueConfigInfo aUpdateInfo = new QueueConfigInfo("root.a", updateParam);
|
||||||
|
@ -447,9 +447,9 @@ public class TestRMWebServicesConfigurationMutation extends JerseyTestBase {
|
||||||
|
|
||||||
response =
|
response =
|
||||||
r.path("ws").path("v1").path("cluster")
|
r.path("ws").path("v1").path("cluster")
|
||||||
.path("queues").queryParam("user.name", userName)
|
.path("sched-conf").queryParam("user.name", userName)
|
||||||
.accept(MediaType.APPLICATION_JSON)
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
|
.entity(toJson(updateInfo, SchedConfUpdateInfo.class),
|
||||||
MediaType.APPLICATION_JSON)
|
MediaType.APPLICATION_JSON)
|
||||||
.put(ClientResponse.class);
|
.put(ClientResponse.class);
|
||||||
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||||
|
@ -459,6 +459,47 @@ public class TestRMWebServicesConfigurationMutation extends JerseyTestBase {
|
||||||
assertEquals(50.0f, newCSConf.getNonLabeledQueueCapacity("root.b"), 0.01f);
|
assertEquals(50.0f, newCSConf.getNonLabeledQueueCapacity("root.b"), 0.01f);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGlobalConfChange() throws Exception {
|
||||||
|
WebResource r = resource();
|
||||||
|
|
||||||
|
ClientResponse response;
|
||||||
|
|
||||||
|
// Set maximum-applications to 30000.
|
||||||
|
SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
|
||||||
|
updateInfo.getGlobalParams().put(CapacitySchedulerConfiguration.PREFIX +
|
||||||
|
"maximum-applications", "30000");
|
||||||
|
|
||||||
|
response =
|
||||||
|
r.path("ws").path("v1").path("cluster")
|
||||||
|
.path("sched-conf").queryParam("user.name", userName)
|
||||||
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
|
.entity(toJson(updateInfo, SchedConfUpdateInfo.class),
|
||||||
|
MediaType.APPLICATION_JSON)
|
||||||
|
.put(ClientResponse.class);
|
||||||
|
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||||
|
CapacitySchedulerConfiguration newCSConf =
|
||||||
|
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
|
||||||
|
assertEquals(30000, newCSConf.getMaximumSystemApplications());
|
||||||
|
|
||||||
|
updateInfo.getGlobalParams().put(CapacitySchedulerConfiguration.PREFIX +
|
||||||
|
"maximum-applications", null);
|
||||||
|
// Unset maximum-applications. Should be set to default.
|
||||||
|
response =
|
||||||
|
r.path("ws").path("v1").path("cluster")
|
||||||
|
.path("sched-conf").queryParam("user.name", userName)
|
||||||
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
|
.entity(toJson(updateInfo, SchedConfUpdateInfo.class),
|
||||||
|
MediaType.APPLICATION_JSON)
|
||||||
|
.put(ClientResponse.class);
|
||||||
|
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||||
|
newCSConf =
|
||||||
|
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
|
||||||
|
assertEquals(CapacitySchedulerConfiguration
|
||||||
|
.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS,
|
||||||
|
newCSConf.getMaximumSystemApplications());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@After
|
@After
|
||||||
public void tearDown() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue