YARN-5952. Create REST API for changing YARN scheduler configurations. (Jonathan Hung via wangda)

This commit is contained in:
Wangda Tan 2017-04-03 10:12:01 -07:00 committed by Jonathan Hung
parent 072527b31a
commit 53a1268125
10 changed files with 857 additions and 13 deletions

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
import java.util.Map;
/**
* Interface for a scheduler that supports changing configuration at runtime.
*
*/
public interface MutableConfScheduler extends ResourceScheduler {
/**
* Update the scheduler's configuration.
* @param user Caller of this update
* @param confUpdate key-value map of the configuration update
* @throws IOException if update is invalid
*/
void updateConfiguration(UserGroupInformation user,
Map<String, String> confUpdate) throws IOException;
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.io.IOException;
import java.util.Map;
/**
@ -29,7 +30,9 @@ public interface MutableConfigurationProvider {
* Update the scheduler configuration with the provided key value pairs.
* @param user User issuing the request
* @param confUpdate Key-value pairs for configurations to be updated.
* @throws IOException if scheduler could not be reinitialized
*/
void mutateConfiguration(String user, Map<String, String> confUpdate);
void mutateConfiguration(String user, Map<String, String> confUpdate)
throws IOException;
}

View File

@ -91,6 +91,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
@ -151,7 +153,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
public class CapacityScheduler extends
AbstractYarnScheduler<FiCaSchedulerApp, FiCaSchedulerNode> implements
PreemptableResourceScheduler, CapacitySchedulerContext, Configurable,
ResourceAllocationCommitter {
ResourceAllocationCommitter, MutableConfScheduler {
private static final Log LOG = LogFactory.getLog(CapacityScheduler.class);
@ -2611,4 +2613,16 @@ public class CapacityScheduler extends
// In seconds
return ((LeafQueue) queue).getMaximumApplicationLifetime();
}
@Override
public void updateConfiguration(UserGroupInformation user,
Map<String, String> confUpdate) throws IOException {
if (csConfProvider instanceof MutableConfigurationProvider) {
((MutableConfigurationProvider) csConfProvider).mutateConfiguration(
user.getShortUserName(), confUpdate);
} else {
throw new UnsupportedOperationException("Configured CS configuration " +
"provider does not support updating configuration.");
}
}
}

View File

@ -58,9 +58,13 @@ public class InMemoryConfigurationStore implements YarnConfigurationStore {
if (isValid) {
Map<String, String> mutations = mutation.getUpdates();
for (Map.Entry<String, String> kv : mutations.entrySet()) {
if (kv.getValue() == null) {
schedConf.unset(kv.getKey());
} else {
schedConf.set(kv.getKey(), kv.getValue());
}
}
}
return true;
}
mutation = pendingMutations.poll();

View File

@ -60,34 +60,44 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
}
Configuration initialSchedConf = new Configuration(false);
initialSchedConf.addResource(YarnConfiguration.CS_CONFIGURATION_FILE);
this.schedConf = initialSchedConf;
confStore.initialize(config, initialSchedConf);
this.schedConf = new Configuration(false);
// We need to explicitly set the key-values in schedConf, otherwise
// these configuration keys cannot be deleted when
// configuration is reloaded.
for (Map.Entry<String, String> kv : initialSchedConf) {
schedConf.set(kv.getKey(), kv.getValue());
}
confStore.initialize(config, schedConf);
this.conf = config;
}
@Override
public CapacitySchedulerConfiguration loadConfiguration(Configuration
configuration) throws IOException {
Configuration loadedConf = new Configuration(configuration);
loadedConf.addResource(schedConf);
Configuration loadedConf = new Configuration(schedConf);
loadedConf.addResource(configuration);
return new CapacitySchedulerConfiguration(loadedConf, false);
}
@Override
public void mutateConfiguration(String user,
Map<String, String> confUpdate) {
Map<String, String> confUpdate) throws IOException {
Configuration oldConf = new Configuration(schedConf);
LogMutation log = new LogMutation(confUpdate, user);
long id = confStore.logMutation(log);
for (Map.Entry<String, String> kv : confUpdate.entrySet()) {
if (kv.getValue() == null) {
schedConf.unset(kv.getKey());
} else {
schedConf.set(kv.getKey(), kv.getValue());
}
}
try {
rmContext.getScheduler().reinitialize(conf, rmContext);
} catch (IOException e) {
schedConf = oldConf;
confStore.confirmMutation(id, false);
return;
throw e;
}
confStore.confirmMutation(id, true);
}

View File

@ -55,7 +55,8 @@ import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import com.google.common.base.Joiner;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -133,11 +134,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
@ -2411,4 +2415,169 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
return rm.getClientRMService().getContainers(request).getContainerList();
}
@PUT
@Path("/queues")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public synchronized Response updateSchedulerConfiguration(final QueueConfigsUpdateInfo
mutationInfo, @Context HttpServletRequest hsr)
throws AuthorizationException, InterruptedException {
init();
final UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr,
true);
ApplicationACLsManager aclsManager = rm.getApplicationACLsManager();
if (aclsManager.areACLsEnabled()) {
if (callerUGI == null || !aclsManager.isAdmin(callerUGI)) {
String msg = "Only admins can carry out this operation.";
throw new ForbiddenException(msg);
}
}
final ResourceScheduler scheduler = rm.getResourceScheduler();
if (scheduler instanceof MutableConfScheduler) {
try {
callerUGI.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws IOException, YarnException {
Map<String, String> confUpdate =
constructKeyValueConfUpdate(mutationInfo);
((CapacityScheduler) scheduler).updateConfiguration(callerUGI,
confUpdate);
return null;
}
});
} catch (IOException e) {
return Response.status(Status.BAD_REQUEST).entity(e.getMessage())
.build();
}
return Response.status(Status.OK).entity("Configuration change " +
"successfully applied.").build();
} else {
return Response.status(Status.BAD_REQUEST)
.entity("Configuration change only supported by CapacityScheduler.")
.build();
}
}
private Map<String, String> constructKeyValueConfUpdate(
QueueConfigsUpdateInfo mutationInfo) throws IOException {
CapacitySchedulerConfiguration currentConf =
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
CapacitySchedulerConfiguration proposedConf =
new CapacitySchedulerConfiguration(currentConf, false);
Map<String, String> confUpdate = new HashMap<>();
for (String queueToRemove : mutationInfo.getRemoveQueueInfo()) {
removeQueue(queueToRemove, proposedConf, confUpdate);
}
for (QueueConfigInfo addQueueInfo : mutationInfo.getAddQueueInfo()) {
addQueue(addQueueInfo, proposedConf, confUpdate);
}
for (QueueConfigInfo updateQueueInfo : mutationInfo.getUpdateQueueInfo()) {
updateQueue(updateQueueInfo, proposedConf, confUpdate);
}
return confUpdate;
}
private void removeQueue(
String queueToRemove, CapacitySchedulerConfiguration proposedConf,
Map<String, String> confUpdate) throws IOException {
if (queueToRemove == null) {
return;
} else {
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
String queueName = queueToRemove.substring(
queueToRemove.lastIndexOf('.') + 1);
CSQueue queue = cs.getQueue(queueName);
if (queue == null ||
!queue.getQueuePath().equals(queueToRemove)) {
throw new IOException("Queue " + queueToRemove + " not found");
} else if (queueToRemove.lastIndexOf('.') == -1) {
throw new IOException("Can't remove queue " + queueToRemove);
}
String parentQueuePath = queueToRemove.substring(0, queueToRemove
.lastIndexOf('.'));
String[] siblingQueues = proposedConf.getQueues(parentQueuePath);
List<String> newSiblingQueues = new ArrayList<>();
for (String siblingQueue : siblingQueues) {
if (!siblingQueue.equals(queueName)) {
newSiblingQueues.add(siblingQueue);
}
}
proposedConf.setQueues(parentQueuePath, newSiblingQueues
.toArray(new String[0]));
String queuesConfig = CapacitySchedulerConfiguration.PREFIX +
parentQueuePath + CapacitySchedulerConfiguration.DOT +
CapacitySchedulerConfiguration.QUEUES;
if (newSiblingQueues.size() == 0) {
confUpdate.put(queuesConfig, null);
} else {
confUpdate.put(queuesConfig, Joiner.on(',').join(newSiblingQueues));
}
for (Map.Entry<String, String> confRemove : proposedConf.getValByRegex(
".*" + queueToRemove.replaceAll("\\.", "\\.") + "\\..*")
.entrySet()) {
proposedConf.unset(confRemove.getKey());
confUpdate.put(confRemove.getKey(), null);
}
}
}
private void addQueue(
QueueConfigInfo addInfo, CapacitySchedulerConfiguration proposedConf,
Map<String, String> confUpdate) throws IOException {
if (addInfo == null) {
return;
} else {
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
String queuePath = addInfo.getQueue();
String queueName = queuePath.substring(queuePath.lastIndexOf('.') + 1);
if (cs.getQueue(queueName) != null) {
throw new IOException("Can't add existing queue " + queuePath);
} else if (queuePath.lastIndexOf('.') == -1) {
throw new IOException("Can't add invalid queue " + queuePath);
}
String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.'));
String[] siblings = proposedConf.getQueues(parentQueue);
List<String> siblingQueues = siblings == null ? new ArrayList<>() :
new ArrayList<>(Arrays.<String>asList(siblings));
siblingQueues.add(queuePath.substring(queuePath.lastIndexOf('.') + 1));
proposedConf.setQueues(parentQueue,
siblingQueues.toArray(new String[0]));
confUpdate.put(CapacitySchedulerConfiguration.PREFIX +
parentQueue + CapacitySchedulerConfiguration.DOT +
CapacitySchedulerConfiguration.QUEUES,
Joiner.on(',').join(siblingQueues));
String keyPrefix = CapacitySchedulerConfiguration.PREFIX +
queuePath + CapacitySchedulerConfiguration.DOT;
for (Map.Entry<String, String> kv : addInfo.getParams().entrySet()) {
if (kv.getValue() == null) {
proposedConf.unset(keyPrefix + kv.getKey());
} else {
proposedConf.set(keyPrefix + kv.getKey(), kv.getValue());
}
confUpdate.put(keyPrefix + kv.getKey(), kv.getValue());
}
}
}
private void updateQueue(QueueConfigInfo updateInfo,
CapacitySchedulerConfiguration proposedConf,
Map<String, String> confUpdate) {
if (updateInfo == null) {
return;
} else {
String queuePath = updateInfo.getQueue();
String keyPrefix = CapacitySchedulerConfiguration.PREFIX +
queuePath + CapacitySchedulerConfiguration.DOT;
for (Map.Entry<String, String> kv : updateInfo.getParams().entrySet()) {
if (kv.getValue() == null) {
proposedConf.unset(keyPrefix + kv.getKey());
} else {
proposedConf.set(keyPrefix + kv.getKey(), kv.getValue());
}
confUpdate.put(keyPrefix + kv.getKey(), kv.getValue());
}
}
}
}

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import java.util.HashMap;
import java.util.Map;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
/**
* Information for adding or updating a queue to scheduler configuration
* for this queue.
*/
@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
public class QueueConfigInfo {
@XmlElement(name = "queueName")
private String queue;
private HashMap<String, String> params = new HashMap<>();
public QueueConfigInfo() { }
public QueueConfigInfo(String queue, Map<String, String> params) {
this.queue = queue;
this.params = new HashMap<>(params);
}
public String getQueue() {
return this.queue;
}
public HashMap<String, String> getParams() {
return this.params;
}
}

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import java.util.ArrayList;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
/**
* Information for making scheduler configuration changes (supports adding,
* removing, or updating a queue).
*/
@XmlRootElement(name = "schedConf")
@XmlAccessorType(XmlAccessType.FIELD)
public class QueueConfigsUpdateInfo {
@XmlElement(name = "add")
private ArrayList<QueueConfigInfo> addQueueInfo = new ArrayList<>();
@XmlElement(name = "remove")
private ArrayList<String> removeQueueInfo = new ArrayList<>();
@XmlElement(name = "update")
private ArrayList<QueueConfigInfo> updateQueueInfo = new ArrayList<>();
public QueueConfigsUpdateInfo() {
// JAXB needs this
}
public ArrayList<QueueConfigInfo> getAddQueueInfo() {
return addQueueInfo;
}
public ArrayList<String> getRemoveQueueInfo() {
return removeQueueInfo;
}
public ArrayList<QueueConfigInfo> getUpdateQueueInfo() {
return updateQueueInfo;
}
}

View File

@ -77,7 +77,11 @@ public class TestMutableCSConfigurationProvider {
assertNull(confProvider.loadConfiguration(conf).get("badKey"));
doThrow(new IOException()).when(cs).reinitialize(any(Configuration.class),
any(RMContext.class));
try {
confProvider.mutateConfiguration(TEST_USER, badUpdate);
} catch (IOException e) {
// Expected exception.
}
assertNull(confProvider.loadConfiguration(conf).get("badKey"));
}
}

View File

@ -0,0 +1,483 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceServletContextListener;
import com.google.inject.servlet.ServletModule;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.json.JSONJAXBContext;
import com.sun.jersey.api.json.JSONMarshaller;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
import com.sun.jersey.test.framework.WebAppDescriptor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.JerseyTestBase;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response.Status;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.StringWriter;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
/**
* Test scheduler configuration mutation via REST API.
*/
public class TestRMWebServicesConfigurationMutation extends JerseyTestBase {
private static final File CONF_FILE = new File(new File("target",
"test-classes"), YarnConfiguration.CS_CONFIGURATION_FILE);
private static final File OLD_CONF_FILE = new File(new File("target",
"test-classes"), YarnConfiguration.CS_CONFIGURATION_FILE + ".tmp");
private static MockRM rm;
private static String userName;
private static CapacitySchedulerConfiguration csConf;
private static YarnConfiguration conf;
private Injector injector = Guice.createInjector(new ServletModule() {
@Override
protected void configureServlets() {
bind(JAXBContextResolver.class);
bind(RMWebServices.class);
bind(GenericExceptionHandler.class);
try {
userName = UserGroupInformation.getCurrentUser().getShortUserName();
} catch (IOException ioe) {
throw new RuntimeException("Unable to get current user name "
+ ioe.getMessage(), ioe);
}
csConf = new CapacitySchedulerConfiguration(new Configuration(false),
false);
setupQueueConfiguration(csConf);
conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.set(CapacitySchedulerConfiguration.CS_CONF_PROVIDER,
CapacitySchedulerConfiguration.STORE_CS_CONF_PROVIDER);
conf.set(YarnConfiguration.YARN_ADMIN_ACL, userName);
try {
if (CONF_FILE.exists()) {
if (!CONF_FILE.renameTo(OLD_CONF_FILE)) {
throw new RuntimeException("Failed to rename conf file");
}
}
FileOutputStream out = new FileOutputStream(CONF_FILE);
csConf.writeXml(out);
out.close();
} catch (IOException e) {
throw new RuntimeException("Failed to write XML file", e);
}
rm = new MockRM(conf);
bind(ResourceManager.class).toInstance(rm);
serve("/*").with(GuiceContainer.class);
filter("/*").through(TestRMWebServicesAppsModification
.TestRMCustomAuthFilter.class);
}
});
public class GuiceServletConfig extends GuiceServletContextListener {
@Override
protected Injector getInjector() {
return injector;
}
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
}
public TestRMWebServicesConfigurationMutation() {
super(new WebAppDescriptor.Builder(
"org.apache.hadoop.yarn.server.resourcemanager.webapp")
.contextListenerClass(GuiceServletConfig.class)
.filterClass(com.google.inject.servlet.GuiceFilter.class)
.contextPath("jersey-guice-filter").servletPath("/").build());
}
private static void setupQueueConfiguration(
CapacitySchedulerConfiguration config) {
config.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[]{"a", "b", "c"});
final String a = CapacitySchedulerConfiguration.ROOT + ".a";
config.setCapacity(a, 25f);
config.setMaximumCapacity(a, 50f);
final String a1 = a + ".a1";
final String a2 = a + ".a2";
config.setQueues(a, new String[]{"a1", "a2"});
config.setCapacity(a1, 100f);
config.setCapacity(a2, 0f);
final String b = CapacitySchedulerConfiguration.ROOT + ".b";
config.setCapacity(b, 75f);
final String c = CapacitySchedulerConfiguration.ROOT + ".c";
config.setCapacity(c, 0f);
final String c1 = c + ".c1";
config.setQueues(c, new String[] {"c1"});
config.setCapacity(c1, 0f);
}
@Test
public void testAddNestedQueue() throws Exception {
WebResource r = resource();
ClientResponse response;
// Add parent queue root.d with two children d1 and d2.
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
Map<String, String> d1Capacity = new HashMap<>();
d1Capacity.put(CapacitySchedulerConfiguration.CAPACITY, "25");
d1Capacity.put(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY, "25");
Map<String, String> nearEmptyCapacity = new HashMap<>();
nearEmptyCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "1E-4");
nearEmptyCapacity.put(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY,
"1E-4");
Map<String, String> d2Capacity = new HashMap<>();
d2Capacity.put(CapacitySchedulerConfiguration.CAPACITY, "75");
d2Capacity.put(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY, "75");
QueueConfigInfo d1 = new QueueConfigInfo("root.d.d1", d1Capacity);
QueueConfigInfo d2 = new QueueConfigInfo("root.d.d2", d2Capacity);
QueueConfigInfo d = new QueueConfigInfo("root.d", nearEmptyCapacity);
updateInfo.getAddQueueInfo().add(d1);
updateInfo.getAddQueueInfo().add(d2);
updateInfo.getAddQueueInfo().add(d);
response =
r.path("ws").path("v1").path("cluster")
.path("queues").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON)
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
MediaType.APPLICATION_JSON)
.put(ClientResponse.class);
assertEquals(Status.OK.getStatusCode(), response.getStatus());
CapacitySchedulerConfiguration newCSConf =
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
assertEquals(4, newCSConf.getQueues("root").length);
assertEquals(2, newCSConf.getQueues("root.d").length);
assertEquals(25.0f, newCSConf.getNonLabeledQueueCapacity("root.d.d1"),
0.01f);
assertEquals(75.0f, newCSConf.getNonLabeledQueueCapacity("root.d.d2"),
0.01f);
}
@Test
public void testAddWithUpdate() throws Exception {
WebResource r = resource();
ClientResponse response;
// Add root.d with capacity 25, reducing root.b capacity from 75 to 50.
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
Map<String, String> dCapacity = new HashMap<>();
dCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "25");
Map<String, String> bCapacity = new HashMap<>();
bCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "50");
QueueConfigInfo d = new QueueConfigInfo("root.d", dCapacity);
QueueConfigInfo b = new QueueConfigInfo("root.b", bCapacity);
updateInfo.getAddQueueInfo().add(d);
updateInfo.getUpdateQueueInfo().add(b);
response =
r.path("ws").path("v1").path("cluster")
.path("queues").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON)
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
MediaType.APPLICATION_JSON)
.put(ClientResponse.class);
assertEquals(Status.OK.getStatusCode(), response.getStatus());
CapacitySchedulerConfiguration newCSConf =
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
assertEquals(4, newCSConf.getQueues("root").length);
assertEquals(25.0f, newCSConf.getNonLabeledQueueCapacity("root.d"), 0.01f);
assertEquals(50.0f, newCSConf.getNonLabeledQueueCapacity("root.b"), 0.01f);
}
@Test
public void testRemoveQueue() throws Exception {
WebResource r = resource();
ClientResponse response;
stopQueue("root.a.a2");
// Remove root.a.a2
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
updateInfo.getRemoveQueueInfo().add("root.a.a2");
response =
r.path("ws").path("v1").path("cluster")
.path("queues").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON)
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
MediaType.APPLICATION_JSON)
.put(ClientResponse.class);
assertEquals(Status.OK.getStatusCode(), response.getStatus());
CapacitySchedulerConfiguration newCSConf =
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
assertEquals(1, newCSConf.getQueues("root.a").length);
assertEquals("a1", newCSConf.getQueues("root.a")[0]);
}
@Test
public void testRemoveParentQueue() throws Exception {
WebResource r = resource();
ClientResponse response;
stopQueue("root.c", "root.c.c1");
// Remove root.c (parent queue)
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
updateInfo.getRemoveQueueInfo().add("root.c");
response =
r.path("ws").path("v1").path("cluster")
.path("queues").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON)
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
MediaType.APPLICATION_JSON)
.put(ClientResponse.class);
assertEquals(Status.OK.getStatusCode(), response.getStatus());
CapacitySchedulerConfiguration newCSConf =
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
assertEquals(2, newCSConf.getQueues("root").length);
assertNull(newCSConf.getQueues("root.c"));
}
@Test
public void testRemoveParentQueueWithCapacity() throws Exception {
WebResource r = resource();
ClientResponse response;
stopQueue("root.a", "root.a.a1", "root.a.a2");
// Remove root.a (parent queue) with capacity 25
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
updateInfo.getRemoveQueueInfo().add("root.a");
// Set root.b capacity to 100
Map<String, String> bCapacity = new HashMap<>();
bCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "100");
QueueConfigInfo b = new QueueConfigInfo("root.b", bCapacity);
updateInfo.getUpdateQueueInfo().add(b);
response =
r.path("ws").path("v1").path("cluster")
.path("queues").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON)
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
MediaType.APPLICATION_JSON)
.put(ClientResponse.class);
assertEquals(Status.OK.getStatusCode(), response.getStatus());
CapacitySchedulerConfiguration newCSConf =
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
assertEquals(2, newCSConf.getQueues("root").length);
assertEquals(100.0f, newCSConf.getNonLabeledQueueCapacity("root.b"),
0.01f);
}
@Test
public void testRemoveMultipleQueues() throws Exception {
WebResource r = resource();
ClientResponse response;
stopQueue("root.b", "root.c", "root.c.c1");
// Remove root.b and root.c
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
updateInfo.getRemoveQueueInfo().add("root.b");
updateInfo.getRemoveQueueInfo().add("root.c");
Map<String, String> aCapacity = new HashMap<>();
aCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "100");
aCapacity.put(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY, "100");
QueueConfigInfo configInfo = new QueueConfigInfo("root.a", aCapacity);
updateInfo.getUpdateQueueInfo().add(configInfo);
response =
r.path("ws").path("v1").path("cluster")
.path("queues").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON)
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
MediaType.APPLICATION_JSON)
.put(ClientResponse.class);
assertEquals(Status.OK.getStatusCode(), response.getStatus());
CapacitySchedulerConfiguration newCSConf =
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
assertEquals(1, newCSConf.getQueues("root").length);
}
private void stopQueue(String... queuePaths) throws Exception {
WebResource r = resource();
ClientResponse response;
// Set state of queues to STOPPED.
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
Map<String, String> stoppedParam = new HashMap<>();
stoppedParam.put(CapacitySchedulerConfiguration.STATE,
QueueState.STOPPED.toString());
for (String queue : queuePaths) {
QueueConfigInfo stoppedInfo = new QueueConfigInfo(queue, stoppedParam);
updateInfo.getUpdateQueueInfo().add(stoppedInfo);
}
response =
r.path("ws").path("v1").path("cluster")
.path("queues").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON)
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
MediaType.APPLICATION_JSON)
.put(ClientResponse.class);
assertEquals(Status.OK.getStatusCode(), response.getStatus());
CapacitySchedulerConfiguration newCSConf =
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
for (String queue : queuePaths) {
assertEquals(QueueState.STOPPED, newCSConf.getState(queue));
}
}
@Test
public void testUpdateQueue() throws Exception {
WebResource r = resource();
ClientResponse response;
// Update config value.
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
Map<String, String> updateParam = new HashMap<>();
updateParam.put(CapacitySchedulerConfiguration.MAXIMUM_AM_RESOURCE_SUFFIX,
"0.2");
QueueConfigInfo aUpdateInfo = new QueueConfigInfo("root.a", updateParam);
updateInfo.getUpdateQueueInfo().add(aUpdateInfo);
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
assertEquals(CapacitySchedulerConfiguration
.DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT,
cs.getConfiguration()
.getMaximumApplicationMasterResourcePerQueuePercent("root.a"),
0.001f);
response =
r.path("ws").path("v1").path("cluster")
.path("queues").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON)
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
MediaType.APPLICATION_JSON)
.put(ClientResponse.class);
assertEquals(Status.OK.getStatusCode(), response.getStatus());
CapacitySchedulerConfiguration newCSConf = cs.getConfiguration();
assertEquals(0.2f, newCSConf
.getMaximumApplicationMasterResourcePerQueuePercent("root.a"), 0.001f);
// Remove config. Config value should be reverted to default.
updateParam.put(CapacitySchedulerConfiguration.MAXIMUM_AM_RESOURCE_SUFFIX,
null);
aUpdateInfo = new QueueConfigInfo("root.a", updateParam);
updateInfo.getUpdateQueueInfo().clear();
updateInfo.getUpdateQueueInfo().add(aUpdateInfo);
response =
r.path("ws").path("v1").path("cluster")
.path("queues").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON)
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
MediaType.APPLICATION_JSON)
.put(ClientResponse.class);
assertEquals(Status.OK.getStatusCode(), response.getStatus());
newCSConf = cs.getConfiguration();
assertEquals(CapacitySchedulerConfiguration
.DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT, newCSConf
.getMaximumApplicationMasterResourcePerQueuePercent("root.a"),
0.001f);
}
@Test
public void testUpdateQueueCapacity() throws Exception {
WebResource r = resource();
ClientResponse response;
// Update root.a and root.b capacity to 50.
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
Map<String, String> updateParam = new HashMap<>();
updateParam.put(CapacitySchedulerConfiguration.CAPACITY, "50");
QueueConfigInfo aUpdateInfo = new QueueConfigInfo("root.a", updateParam);
QueueConfigInfo bUpdateInfo = new QueueConfigInfo("root.b", updateParam);
updateInfo.getUpdateQueueInfo().add(aUpdateInfo);
updateInfo.getUpdateQueueInfo().add(bUpdateInfo);
response =
r.path("ws").path("v1").path("cluster")
.path("queues").queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON)
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
MediaType.APPLICATION_JSON)
.put(ClientResponse.class);
assertEquals(Status.OK.getStatusCode(), response.getStatus());
CapacitySchedulerConfiguration newCSConf =
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
assertEquals(50.0f, newCSConf.getNonLabeledQueueCapacity("root.a"), 0.01f);
assertEquals(50.0f, newCSConf.getNonLabeledQueueCapacity("root.b"), 0.01f);
}
@Override
@After
public void tearDown() throws Exception {
if (rm != null) {
rm.stop();
}
CONF_FILE.delete();
if (!OLD_CONF_FILE.renameTo(CONF_FILE)) {
throw new RuntimeException("Failed to re-copy old configuration file");
}
super.tearDown();
}
@SuppressWarnings("rawtypes")
private String toJson(Object nsli, Class klass) throws Exception {
StringWriter sw = new StringWriter();
JSONJAXBContext ctx = new JSONJAXBContext(klass);
JSONMarshaller jm = ctx.createJSONMarshaller();
jm.marshallToJSON(nsli, sw);
return sw.toString();
}
}