YARN-5952. Create REST API for changing YARN scheduler configurations. (Jonathan Hung via wangda)
This commit is contained in:
parent
28754646e5
commit
d1514bacd1
|
@ -0,0 +1,40 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Interface for a scheduler that supports changing configuration at runtime.
|
||||
*
|
||||
*/
|
||||
public interface MutableConfScheduler extends ResourceScheduler {
|
||||
|
||||
/**
|
||||
* Update the scheduler's configuration.
|
||||
* @param user Caller of this update
|
||||
* @param confUpdate key-value map of the configuration update
|
||||
* @throws IOException if update is invalid
|
||||
*/
|
||||
void updateConfiguration(UserGroupInformation user,
|
||||
Map<String, String> confUpdate) throws IOException;
|
||||
|
||||
}
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
|
@ -29,7 +30,9 @@ public interface MutableConfigurationProvider {
|
|||
* Update the scheduler configuration with the provided key value pairs.
|
||||
* @param user User issuing the request
|
||||
* @param confUpdate Key-value pairs for configurations to be updated.
|
||||
* @throws IOException if scheduler could not be reinitialized
|
||||
*/
|
||||
void mutateConfiguration(String user, Map<String, String> confUpdate);
|
||||
void mutateConfiguration(String user, Map<String, String> confUpdate)
|
||||
throws IOException;
|
||||
|
||||
}
|
||||
|
|
|
@ -86,6 +86,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||
|
@ -150,7 +152,7 @@ import com.google.common.util.concurrent.SettableFuture;
|
|||
public class CapacityScheduler extends
|
||||
AbstractYarnScheduler<FiCaSchedulerApp, FiCaSchedulerNode> implements
|
||||
PreemptableResourceScheduler, CapacitySchedulerContext, Configurable,
|
||||
ResourceAllocationCommitter {
|
||||
ResourceAllocationCommitter, MutableConfScheduler {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(CapacityScheduler.class);
|
||||
|
||||
|
@ -2610,4 +2612,16 @@ public class CapacityScheduler extends
|
|||
// In seconds
|
||||
return ((LeafQueue) queue).getMaximumApplicationLifetime();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateConfiguration(UserGroupInformation user,
|
||||
Map<String, String> confUpdate) throws IOException {
|
||||
if (csConfProvider instanceof MutableConfigurationProvider) {
|
||||
((MutableConfigurationProvider) csConfProvider).mutateConfiguration(
|
||||
user.getShortUserName(), confUpdate);
|
||||
} else {
|
||||
throw new UnsupportedOperationException("Configured CS configuration " +
|
||||
"provider does not support updating configuration.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -58,7 +58,11 @@ public class InMemoryConfigurationStore implements YarnConfigurationStore {
|
|||
if (isValid) {
|
||||
Map<String, String> mutations = mutation.getUpdates();
|
||||
for (Map.Entry<String, String> kv : mutations.entrySet()) {
|
||||
schedConf.set(kv.getKey(), kv.getValue());
|
||||
if (kv.getValue() == null) {
|
||||
schedConf.unset(kv.getKey());
|
||||
} else {
|
||||
schedConf.set(kv.getKey(), kv.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
|
|
|
@ -60,34 +60,44 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
|
|||
}
|
||||
Configuration initialSchedConf = new Configuration(false);
|
||||
initialSchedConf.addResource(YarnConfiguration.CS_CONFIGURATION_FILE);
|
||||
this.schedConf = initialSchedConf;
|
||||
confStore.initialize(config, initialSchedConf);
|
||||
this.schedConf = new Configuration(false);
|
||||
// We need to explicitly set the key-values in schedConf, otherwise
|
||||
// these configuration keys cannot be deleted when
|
||||
// configuration is reloaded.
|
||||
for (Map.Entry<String, String> kv : initialSchedConf) {
|
||||
schedConf.set(kv.getKey(), kv.getValue());
|
||||
}
|
||||
confStore.initialize(config, schedConf);
|
||||
this.conf = config;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CapacitySchedulerConfiguration loadConfiguration(Configuration
|
||||
configuration) throws IOException {
|
||||
Configuration loadedConf = new Configuration(configuration);
|
||||
loadedConf.addResource(schedConf);
|
||||
Configuration loadedConf = new Configuration(schedConf);
|
||||
loadedConf.addResource(configuration);
|
||||
return new CapacitySchedulerConfiguration(loadedConf, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void mutateConfiguration(String user,
|
||||
Map<String, String> confUpdate) {
|
||||
Map<String, String> confUpdate) throws IOException {
|
||||
Configuration oldConf = new Configuration(schedConf);
|
||||
LogMutation log = new LogMutation(confUpdate, user);
|
||||
long id = confStore.logMutation(log);
|
||||
for (Map.Entry<String, String> kv : confUpdate.entrySet()) {
|
||||
schedConf.set(kv.getKey(), kv.getValue());
|
||||
if (kv.getValue() == null) {
|
||||
schedConf.unset(kv.getKey());
|
||||
} else {
|
||||
schedConf.set(kv.getKey(), kv.getValue());
|
||||
}
|
||||
}
|
||||
try {
|
||||
rmContext.getScheduler().reinitialize(conf, rmContext);
|
||||
} catch (IOException e) {
|
||||
schedConf = oldConf;
|
||||
confStore.confirmMutation(id, false);
|
||||
return;
|
||||
throw e;
|
||||
}
|
||||
confStore.confirmMutation(id, true);
|
||||
}
|
||||
|
|
|
@ -55,7 +55,8 @@ import javax.ws.rs.core.HttpHeaders;
|
|||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.Response.Status;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -134,11 +135,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||
|
@ -2454,4 +2458,170 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
|
|||
GetContainersRequest request) throws YarnException, IOException {
|
||||
return rm.getClientRMService().getContainers(request).getContainerList();
|
||||
}
|
||||
|
||||
@PUT
|
||||
@Path("/queues")
|
||||
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
||||
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
|
||||
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
|
||||
public Response updateSchedulerConfiguration(QueueConfigsUpdateInfo
|
||||
mutationInfo, @Context HttpServletRequest hsr)
|
||||
throws AuthorizationException, InterruptedException {
|
||||
init();
|
||||
|
||||
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
|
||||
ApplicationACLsManager aclsManager = rm.getApplicationACLsManager();
|
||||
if (aclsManager.areACLsEnabled()) {
|
||||
if (callerUGI == null || !aclsManager.isAdmin(callerUGI)) {
|
||||
String msg = "Only admins can carry out this operation.";
|
||||
throw new ForbiddenException(msg);
|
||||
}
|
||||
}
|
||||
|
||||
ResourceScheduler scheduler = rm.getResourceScheduler();
|
||||
if (scheduler instanceof MutableConfScheduler) {
|
||||
try {
|
||||
callerUGI.doAs(new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws IOException, YarnException {
|
||||
Map<String, String> confUpdate =
|
||||
constructKeyValueConfUpdate(mutationInfo);
|
||||
((CapacityScheduler) scheduler).updateConfiguration(callerUGI,
|
||||
confUpdate);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
} catch (IOException e) {
|
||||
return Response.status(Status.BAD_REQUEST).entity(e.getMessage())
|
||||
.build();
|
||||
}
|
||||
return Response.status(Status.OK).entity("Configuration change " +
|
||||
"successfully applied.").build();
|
||||
} else {
|
||||
return Response.status(Status.BAD_REQUEST)
|
||||
.entity("Configuration change only supported by CapacityScheduler.")
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String, String> constructKeyValueConfUpdate(
|
||||
QueueConfigsUpdateInfo mutationInfo) throws IOException {
|
||||
CapacitySchedulerConfiguration currentConf =
|
||||
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
|
||||
CapacitySchedulerConfiguration proposedConf =
|
||||
new CapacitySchedulerConfiguration(currentConf, false);
|
||||
Map<String, String> confUpdate = new HashMap<>();
|
||||
for (String queueToRemove : mutationInfo.getRemoveQueueInfo()) {
|
||||
removeQueue(queueToRemove, proposedConf, confUpdate);
|
||||
}
|
||||
for (QueueConfigInfo addQueueInfo : mutationInfo.getAddQueueInfo()) {
|
||||
addQueue(addQueueInfo, proposedConf, confUpdate);
|
||||
}
|
||||
for (QueueConfigInfo updateQueueInfo : mutationInfo.getUpdateQueueInfo()) {
|
||||
updateQueue(updateQueueInfo, proposedConf, confUpdate);
|
||||
}
|
||||
return confUpdate;
|
||||
}
|
||||
|
||||
private void removeQueue(
|
||||
String queueToRemove, CapacitySchedulerConfiguration proposedConf,
|
||||
Map<String, String> confUpdate) throws IOException {
|
||||
if (queueToRemove == null) {
|
||||
return;
|
||||
} else {
|
||||
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
String queueName = queueToRemove.substring(
|
||||
queueToRemove.lastIndexOf('.') + 1);
|
||||
CSQueue queue = cs.getQueue(queueName);
|
||||
if (queue == null ||
|
||||
!queue.getQueuePath().equals(queueToRemove)) {
|
||||
throw new IOException("Queue " + queueToRemove + " not found");
|
||||
} else if (queueToRemove.lastIndexOf('.') == -1) {
|
||||
throw new IOException("Can't remove queue " + queueToRemove);
|
||||
}
|
||||
String parentQueuePath = queueToRemove.substring(0, queueToRemove
|
||||
.lastIndexOf('.'));
|
||||
String[] siblingQueues = proposedConf.getQueues(parentQueuePath);
|
||||
List<String> newSiblingQueues = new ArrayList<>();
|
||||
for (String siblingQueue : siblingQueues) {
|
||||
if (!siblingQueue.equals(queueName)) {
|
||||
newSiblingQueues.add(siblingQueue);
|
||||
}
|
||||
}
|
||||
proposedConf.setQueues(parentQueuePath, newSiblingQueues
|
||||
.toArray(new String[0]));
|
||||
String queuesConfig = CapacitySchedulerConfiguration.PREFIX +
|
||||
parentQueuePath + CapacitySchedulerConfiguration.DOT +
|
||||
CapacitySchedulerConfiguration.QUEUES;
|
||||
if (newSiblingQueues.size() == 0) {
|
||||
confUpdate.put(queuesConfig, null);
|
||||
} else {
|
||||
confUpdate.put(queuesConfig, Joiner.on(',').join(newSiblingQueues));
|
||||
}
|
||||
for (Map.Entry<String, String> confRemove : proposedConf.getValByRegex(
|
||||
".*" + queueToRemove.replaceAll("\\.", "\\.") + "\\..*")
|
||||
.entrySet()) {
|
||||
proposedConf.unset(confRemove.getKey());
|
||||
confUpdate.put(confRemove.getKey(), null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void addQueue(
|
||||
QueueConfigInfo addInfo, CapacitySchedulerConfiguration proposedConf,
|
||||
Map<String, String> confUpdate) throws IOException {
|
||||
if (addInfo == null) {
|
||||
return;
|
||||
} else {
|
||||
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
String queuePath = addInfo.getQueue();
|
||||
String queueName = queuePath.substring(queuePath.lastIndexOf('.') + 1);
|
||||
if (cs.getQueue(queueName) != null) {
|
||||
throw new IOException("Can't add existing queue " + queuePath);
|
||||
} else if (queuePath.lastIndexOf('.') == -1) {
|
||||
throw new IOException("Can't add invalid queue " + queuePath);
|
||||
}
|
||||
String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.'));
|
||||
String[] siblings = proposedConf.getQueues(parentQueue);
|
||||
List<String> siblingQueues = siblings == null ? new ArrayList<>() :
|
||||
new ArrayList<>(Arrays.<String>asList(siblings));
|
||||
siblingQueues.add(queuePath.substring(queuePath.lastIndexOf('.') + 1));
|
||||
proposedConf.setQueues(parentQueue,
|
||||
siblingQueues.toArray(new String[0]));
|
||||
confUpdate.put(CapacitySchedulerConfiguration.PREFIX +
|
||||
parentQueue + CapacitySchedulerConfiguration.DOT +
|
||||
CapacitySchedulerConfiguration.QUEUES,
|
||||
Joiner.on(',').join(siblingQueues));
|
||||
String keyPrefix = CapacitySchedulerConfiguration.PREFIX +
|
||||
queuePath + CapacitySchedulerConfiguration.DOT;
|
||||
for (Map.Entry<String, String> kv : addInfo.getParams().entrySet()) {
|
||||
if (kv.getValue() == null) {
|
||||
proposedConf.unset(keyPrefix + kv.getKey());
|
||||
} else {
|
||||
proposedConf.set(keyPrefix + kv.getKey(), kv.getValue());
|
||||
}
|
||||
confUpdate.put(keyPrefix + kv.getKey(), kv.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void updateQueue(QueueConfigInfo updateInfo,
|
||||
CapacitySchedulerConfiguration proposedConf,
|
||||
Map<String, String> confUpdate) {
|
||||
if (updateInfo == null) {
|
||||
return;
|
||||
} else {
|
||||
String queuePath = updateInfo.getQueue();
|
||||
String keyPrefix = CapacitySchedulerConfiguration.PREFIX +
|
||||
queuePath + CapacitySchedulerConfiguration.DOT;
|
||||
for (Map.Entry<String, String> kv : updateInfo.getParams().entrySet()) {
|
||||
if (kv.getValue() == null) {
|
||||
proposedConf.unset(keyPrefix + kv.getKey());
|
||||
} else {
|
||||
proposedConf.set(keyPrefix + kv.getKey(), kv.getValue());
|
||||
}
|
||||
confUpdate.put(keyPrefix + kv.getKey(), kv.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.xml.bind.annotation.XmlAccessType;
|
||||
import javax.xml.bind.annotation.XmlAccessorType;
|
||||
import javax.xml.bind.annotation.XmlElement;
|
||||
import javax.xml.bind.annotation.XmlRootElement;
|
||||
|
||||
/**
|
||||
* Information for adding or updating a queue to scheduler configuration
|
||||
* for this queue.
|
||||
*/
|
||||
@XmlRootElement
|
||||
@XmlAccessorType(XmlAccessType.FIELD)
|
||||
public class QueueConfigInfo {
|
||||
|
||||
@XmlElement(name = "queueName")
|
||||
private String queue;
|
||||
|
||||
private HashMap<String, String> params = new HashMap<>();
|
||||
|
||||
public QueueConfigInfo() { }
|
||||
|
||||
public QueueConfigInfo(String queue, Map<String, String> params) {
|
||||
this.queue = queue;
|
||||
this.params = new HashMap<>(params);
|
||||
}
|
||||
|
||||
public String getQueue() {
|
||||
return this.queue;
|
||||
}
|
||||
|
||||
public HashMap<String, String> getParams() {
|
||||
return this.params;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,60 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
||||
import javax.xml.bind.annotation.XmlAccessType;
|
||||
import javax.xml.bind.annotation.XmlAccessorType;
|
||||
import javax.xml.bind.annotation.XmlElement;
|
||||
import javax.xml.bind.annotation.XmlRootElement;
|
||||
|
||||
/**
|
||||
* Information for making scheduler configuration changes (supports adding,
|
||||
* removing, or updating a queue).
|
||||
*/
|
||||
@XmlRootElement(name = "schedConf")
|
||||
@XmlAccessorType(XmlAccessType.FIELD)
|
||||
public class QueueConfigsUpdateInfo {
|
||||
|
||||
@XmlElement(name = "add")
|
||||
private ArrayList<QueueConfigInfo> addQueueInfo = new ArrayList<>();
|
||||
|
||||
@XmlElement(name = "remove")
|
||||
private ArrayList<String> removeQueueInfo = new ArrayList<>();
|
||||
|
||||
@XmlElement(name = "update")
|
||||
private ArrayList<QueueConfigInfo> updateQueueInfo = new ArrayList<>();
|
||||
|
||||
public QueueConfigsUpdateInfo() {
|
||||
// JAXB needs this
|
||||
}
|
||||
|
||||
public ArrayList<QueueConfigInfo> getAddQueueInfo() {
|
||||
return addQueueInfo;
|
||||
}
|
||||
|
||||
public ArrayList<String> getRemoveQueueInfo() {
|
||||
return removeQueueInfo;
|
||||
}
|
||||
|
||||
public ArrayList<QueueConfigInfo> getUpdateQueueInfo() {
|
||||
return updateQueueInfo;
|
||||
}
|
||||
}
|
|
@ -77,7 +77,11 @@ public class TestMutableCSConfigurationProvider {
|
|||
assertNull(confProvider.loadConfiguration(conf).get("badKey"));
|
||||
doThrow(new IOException()).when(cs).reinitialize(any(Configuration.class),
|
||||
any(RMContext.class));
|
||||
confProvider.mutateConfiguration(TEST_USER, badUpdate);
|
||||
try {
|
||||
confProvider.mutateConfiguration(TEST_USER, badUpdate);
|
||||
} catch (IOException e) {
|
||||
// Expected exception.
|
||||
}
|
||||
assertNull(confProvider.loadConfiguration(conf).get("badKey"));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,477 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
|
||||
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.servlet.ServletModule;
|
||||
import com.sun.jersey.api.client.ClientResponse;
|
||||
import com.sun.jersey.api.client.WebResource;
|
||||
import com.sun.jersey.api.json.JSONJAXBContext;
|
||||
import com.sun.jersey.api.json.JSONMarshaller;
|
||||
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
|
||||
import com.sun.jersey.test.framework.WebAppDescriptor;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.QueueState;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo;
|
||||
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
|
||||
import org.apache.hadoop.yarn.webapp.GuiceServletConfig;
|
||||
import org.apache.hadoop.yarn.webapp.JerseyTestBase;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.Response.Status;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.StringWriter;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
||||
/**
|
||||
* Test scheduler configuration mutation via REST API.
|
||||
*/
|
||||
public class TestRMWebServicesConfigurationMutation extends JerseyTestBase {
|
||||
|
||||
private static final File CONF_FILE = new File(new File("target",
|
||||
"test-classes"), YarnConfiguration.CS_CONFIGURATION_FILE);
|
||||
private static final File OLD_CONF_FILE = new File(new File("target",
|
||||
"test-classes"), YarnConfiguration.CS_CONFIGURATION_FILE + ".tmp");
|
||||
|
||||
private static MockRM rm;
|
||||
private static String userName;
|
||||
private static CapacitySchedulerConfiguration csConf;
|
||||
private static YarnConfiguration conf;
|
||||
|
||||
private static class WebServletModule extends ServletModule {
|
||||
@Override
|
||||
protected void configureServlets() {
|
||||
bind(JAXBContextResolver.class);
|
||||
bind(RMWebServices.class);
|
||||
bind(GenericExceptionHandler.class);
|
||||
try {
|
||||
userName = UserGroupInformation.getCurrentUser().getShortUserName();
|
||||
} catch (IOException ioe) {
|
||||
throw new RuntimeException("Unable to get current user name "
|
||||
+ ioe.getMessage(), ioe);
|
||||
}
|
||||
csConf = new CapacitySchedulerConfiguration(new Configuration(false),
|
||||
false);
|
||||
setupQueueConfiguration(csConf);
|
||||
conf = new YarnConfiguration();
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
conf.set(CapacitySchedulerConfiguration.CS_CONF_PROVIDER,
|
||||
CapacitySchedulerConfiguration.STORE_CS_CONF_PROVIDER);
|
||||
conf.set(YarnConfiguration.YARN_ADMIN_ACL, userName);
|
||||
try {
|
||||
if (CONF_FILE.exists()) {
|
||||
if (!CONF_FILE.renameTo(OLD_CONF_FILE)) {
|
||||
throw new RuntimeException("Failed to rename conf file");
|
||||
}
|
||||
}
|
||||
FileOutputStream out = new FileOutputStream(CONF_FILE);
|
||||
csConf.writeXml(out);
|
||||
out.close();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Failed to write XML file", e);
|
||||
}
|
||||
rm = new MockRM(conf);
|
||||
bind(ResourceManager.class).toInstance(rm);
|
||||
serve("/*").with(GuiceContainer.class);
|
||||
filter("/*").through(TestRMWebServicesAppsModification
|
||||
.TestRMCustomAuthFilter.class);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
GuiceServletConfig.setInjector(
|
||||
Guice.createInjector(new WebServletModule()));
|
||||
}
|
||||
|
||||
private static void setupQueueConfiguration(
|
||||
CapacitySchedulerConfiguration config) {
|
||||
config.setQueues(CapacitySchedulerConfiguration.ROOT,
|
||||
new String[]{"a", "b", "c"});
|
||||
|
||||
final String a = CapacitySchedulerConfiguration.ROOT + ".a";
|
||||
config.setCapacity(a, 25f);
|
||||
config.setMaximumCapacity(a, 50f);
|
||||
|
||||
final String a1 = a + ".a1";
|
||||
final String a2 = a + ".a2";
|
||||
config.setQueues(a, new String[]{"a1", "a2"});
|
||||
config.setCapacity(a1, 100f);
|
||||
config.setCapacity(a2, 0f);
|
||||
|
||||
final String b = CapacitySchedulerConfiguration.ROOT + ".b";
|
||||
config.setCapacity(b, 75f);
|
||||
|
||||
final String c = CapacitySchedulerConfiguration.ROOT + ".c";
|
||||
config.setCapacity(c, 0f);
|
||||
|
||||
final String c1 = c + ".c1";
|
||||
config.setQueues(c, new String[] {"c1"});
|
||||
config.setCapacity(c1, 0f);
|
||||
}
|
||||
|
||||
public TestRMWebServicesConfigurationMutation() {
|
||||
super(new WebAppDescriptor.Builder(
|
||||
"org.apache.hadoop.yarn.server.resourcemanager.webapp")
|
||||
.contextListenerClass(GuiceServletConfig.class)
|
||||
.filterClass(com.google.inject.servlet.GuiceFilter.class)
|
||||
.contextPath("jersey-guice-filter").servletPath("/").build());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddNestedQueue() throws Exception {
|
||||
WebResource r = resource();
|
||||
|
||||
ClientResponse response;
|
||||
|
||||
// Add parent queue root.d with two children d1 and d2.
|
||||
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
|
||||
Map<String, String> d1Capacity = new HashMap<>();
|
||||
d1Capacity.put(CapacitySchedulerConfiguration.CAPACITY, "25");
|
||||
d1Capacity.put(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY, "25");
|
||||
Map<String, String> nearEmptyCapacity = new HashMap<>();
|
||||
nearEmptyCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "1E-4");
|
||||
nearEmptyCapacity.put(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY,
|
||||
"1E-4");
|
||||
Map<String, String> d2Capacity = new HashMap<>();
|
||||
d2Capacity.put(CapacitySchedulerConfiguration.CAPACITY, "75");
|
||||
d2Capacity.put(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY, "75");
|
||||
QueueConfigInfo d1 = new QueueConfigInfo("root.d.d1", d1Capacity);
|
||||
QueueConfigInfo d2 = new QueueConfigInfo("root.d.d2", d2Capacity);
|
||||
QueueConfigInfo d = new QueueConfigInfo("root.d", nearEmptyCapacity);
|
||||
updateInfo.getAddQueueInfo().add(d1);
|
||||
updateInfo.getAddQueueInfo().add(d2);
|
||||
updateInfo.getAddQueueInfo().add(d);
|
||||
response =
|
||||
r.path("ws").path("v1").path("cluster")
|
||||
.path("queues").queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
|
||||
MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class);
|
||||
|
||||
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||
CapacitySchedulerConfiguration newCSConf =
|
||||
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
|
||||
assertEquals(4, newCSConf.getQueues("root").length);
|
||||
assertEquals(2, newCSConf.getQueues("root.d").length);
|
||||
assertEquals(25.0f, newCSConf.getNonLabeledQueueCapacity("root.d.d1"),
|
||||
0.01f);
|
||||
assertEquals(75.0f, newCSConf.getNonLabeledQueueCapacity("root.d.d2"),
|
||||
0.01f);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddWithUpdate() throws Exception {
|
||||
WebResource r = resource();
|
||||
|
||||
ClientResponse response;
|
||||
|
||||
// Add root.d with capacity 25, reducing root.b capacity from 75 to 50.
|
||||
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
|
||||
Map<String, String> dCapacity = new HashMap<>();
|
||||
dCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "25");
|
||||
Map<String, String> bCapacity = new HashMap<>();
|
||||
bCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "50");
|
||||
QueueConfigInfo d = new QueueConfigInfo("root.d", dCapacity);
|
||||
QueueConfigInfo b = new QueueConfigInfo("root.b", bCapacity);
|
||||
updateInfo.getAddQueueInfo().add(d);
|
||||
updateInfo.getUpdateQueueInfo().add(b);
|
||||
response =
|
||||
r.path("ws").path("v1").path("cluster")
|
||||
.path("queues").queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
|
||||
MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class);
|
||||
|
||||
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||
CapacitySchedulerConfiguration newCSConf =
|
||||
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
|
||||
assertEquals(4, newCSConf.getQueues("root").length);
|
||||
assertEquals(25.0f, newCSConf.getNonLabeledQueueCapacity("root.d"), 0.01f);
|
||||
assertEquals(50.0f, newCSConf.getNonLabeledQueueCapacity("root.b"), 0.01f);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveQueue() throws Exception {
|
||||
WebResource r = resource();
|
||||
|
||||
ClientResponse response;
|
||||
|
||||
stopQueue("root.a.a2");
|
||||
// Remove root.a.a2
|
||||
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
|
||||
updateInfo.getRemoveQueueInfo().add("root.a.a2");
|
||||
response =
|
||||
r.path("ws").path("v1").path("cluster")
|
||||
.path("queues").queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
|
||||
MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class);
|
||||
|
||||
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||
CapacitySchedulerConfiguration newCSConf =
|
||||
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
|
||||
assertEquals(1, newCSConf.getQueues("root.a").length);
|
||||
assertEquals("a1", newCSConf.getQueues("root.a")[0]);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveParentQueue() throws Exception {
|
||||
WebResource r = resource();
|
||||
|
||||
ClientResponse response;
|
||||
|
||||
stopQueue("root.c", "root.c.c1");
|
||||
// Remove root.c (parent queue)
|
||||
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
|
||||
updateInfo.getRemoveQueueInfo().add("root.c");
|
||||
response =
|
||||
r.path("ws").path("v1").path("cluster")
|
||||
.path("queues").queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
|
||||
MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class);
|
||||
|
||||
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||
CapacitySchedulerConfiguration newCSConf =
|
||||
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
|
||||
assertEquals(2, newCSConf.getQueues("root").length);
|
||||
assertNull(newCSConf.getQueues("root.c"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveParentQueueWithCapacity() throws Exception {
|
||||
WebResource r = resource();
|
||||
|
||||
ClientResponse response;
|
||||
|
||||
stopQueue("root.a", "root.a.a1", "root.a.a2");
|
||||
// Remove root.a (parent queue) with capacity 25
|
||||
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
|
||||
updateInfo.getRemoveQueueInfo().add("root.a");
|
||||
|
||||
// Set root.b capacity to 100
|
||||
Map<String, String> bCapacity = new HashMap<>();
|
||||
bCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "100");
|
||||
QueueConfigInfo b = new QueueConfigInfo("root.b", bCapacity);
|
||||
updateInfo.getUpdateQueueInfo().add(b);
|
||||
response =
|
||||
r.path("ws").path("v1").path("cluster")
|
||||
.path("queues").queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
|
||||
MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class);
|
||||
|
||||
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||
CapacitySchedulerConfiguration newCSConf =
|
||||
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
|
||||
assertEquals(2, newCSConf.getQueues("root").length);
|
||||
assertEquals(100.0f, newCSConf.getNonLabeledQueueCapacity("root.b"),
|
||||
0.01f);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveMultipleQueues() throws Exception {
|
||||
WebResource r = resource();
|
||||
|
||||
ClientResponse response;
|
||||
|
||||
stopQueue("root.b", "root.c", "root.c.c1");
|
||||
// Remove root.b and root.c
|
||||
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
|
||||
updateInfo.getRemoveQueueInfo().add("root.b");
|
||||
updateInfo.getRemoveQueueInfo().add("root.c");
|
||||
Map<String, String> aCapacity = new HashMap<>();
|
||||
aCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "100");
|
||||
aCapacity.put(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY, "100");
|
||||
QueueConfigInfo configInfo = new QueueConfigInfo("root.a", aCapacity);
|
||||
updateInfo.getUpdateQueueInfo().add(configInfo);
|
||||
response =
|
||||
r.path("ws").path("v1").path("cluster")
|
||||
.path("queues").queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
|
||||
MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class);
|
||||
|
||||
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||
CapacitySchedulerConfiguration newCSConf =
|
||||
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
|
||||
assertEquals(1, newCSConf.getQueues("root").length);
|
||||
}
|
||||
|
||||
private void stopQueue(String... queuePaths) throws Exception {
|
||||
WebResource r = resource();
|
||||
|
||||
ClientResponse response;
|
||||
|
||||
// Set state of queues to STOPPED.
|
||||
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
|
||||
Map<String, String> stoppedParam = new HashMap<>();
|
||||
stoppedParam.put(CapacitySchedulerConfiguration.STATE,
|
||||
QueueState.STOPPED.toString());
|
||||
for (String queue : queuePaths) {
|
||||
QueueConfigInfo stoppedInfo = new QueueConfigInfo(queue, stoppedParam);
|
||||
updateInfo.getUpdateQueueInfo().add(stoppedInfo);
|
||||
}
|
||||
response =
|
||||
r.path("ws").path("v1").path("cluster")
|
||||
.path("queues").queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
|
||||
MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class);
|
||||
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||
CapacitySchedulerConfiguration newCSConf =
|
||||
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
|
||||
for (String queue : queuePaths) {
|
||||
assertEquals(QueueState.STOPPED, newCSConf.getState(queue));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateQueue() throws Exception {
|
||||
WebResource r = resource();
|
||||
|
||||
ClientResponse response;
|
||||
|
||||
// Update config value.
|
||||
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
|
||||
Map<String, String> updateParam = new HashMap<>();
|
||||
updateParam.put(CapacitySchedulerConfiguration.MAXIMUM_AM_RESOURCE_SUFFIX,
|
||||
"0.2");
|
||||
QueueConfigInfo aUpdateInfo = new QueueConfigInfo("root.a", updateParam);
|
||||
updateInfo.getUpdateQueueInfo().add(aUpdateInfo);
|
||||
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
|
||||
assertEquals(CapacitySchedulerConfiguration
|
||||
.DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT,
|
||||
cs.getConfiguration()
|
||||
.getMaximumApplicationMasterResourcePerQueuePercent("root.a"),
|
||||
0.001f);
|
||||
response =
|
||||
r.path("ws").path("v1").path("cluster")
|
||||
.path("queues").queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
|
||||
MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class);
|
||||
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||
CapacitySchedulerConfiguration newCSConf = cs.getConfiguration();
|
||||
assertEquals(0.2f, newCSConf
|
||||
.getMaximumApplicationMasterResourcePerQueuePercent("root.a"), 0.001f);
|
||||
|
||||
// Remove config. Config value should be reverted to default.
|
||||
updateParam.put(CapacitySchedulerConfiguration.MAXIMUM_AM_RESOURCE_SUFFIX,
|
||||
null);
|
||||
aUpdateInfo = new QueueConfigInfo("root.a", updateParam);
|
||||
updateInfo.getUpdateQueueInfo().clear();
|
||||
updateInfo.getUpdateQueueInfo().add(aUpdateInfo);
|
||||
response =
|
||||
r.path("ws").path("v1").path("cluster")
|
||||
.path("queues").queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
|
||||
MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class);
|
||||
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||
newCSConf = cs.getConfiguration();
|
||||
assertEquals(CapacitySchedulerConfiguration
|
||||
.DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT, newCSConf
|
||||
.getMaximumApplicationMasterResourcePerQueuePercent("root.a"),
|
||||
0.001f);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateQueueCapacity() throws Exception {
|
||||
WebResource r = resource();
|
||||
|
||||
ClientResponse response;
|
||||
|
||||
// Update root.a and root.b capacity to 50.
|
||||
QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo();
|
||||
Map<String, String> updateParam = new HashMap<>();
|
||||
updateParam.put(CapacitySchedulerConfiguration.CAPACITY, "50");
|
||||
QueueConfigInfo aUpdateInfo = new QueueConfigInfo("root.a", updateParam);
|
||||
QueueConfigInfo bUpdateInfo = new QueueConfigInfo("root.b", updateParam);
|
||||
updateInfo.getUpdateQueueInfo().add(aUpdateInfo);
|
||||
updateInfo.getUpdateQueueInfo().add(bUpdateInfo);
|
||||
|
||||
response =
|
||||
r.path("ws").path("v1").path("cluster")
|
||||
.path("queues").queryParam("user.name", userName)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.entity(toJson(updateInfo, QueueConfigsUpdateInfo.class),
|
||||
MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class);
|
||||
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||
CapacitySchedulerConfiguration newCSConf =
|
||||
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
|
||||
assertEquals(50.0f, newCSConf.getNonLabeledQueueCapacity("root.a"), 0.01f);
|
||||
assertEquals(50.0f, newCSConf.getNonLabeledQueueCapacity("root.b"), 0.01f);
|
||||
}
|
||||
|
||||
@Override
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
if (rm != null) {
|
||||
rm.stop();
|
||||
}
|
||||
CONF_FILE.delete();
|
||||
if (!OLD_CONF_FILE.renameTo(CONF_FILE)) {
|
||||
throw new RuntimeException("Failed to re-copy old configuration file");
|
||||
}
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
private String toJson(Object nsli, Class klass) throws Exception {
|
||||
StringWriter sw = new StringWriter();
|
||||
JSONJAXBContext ctx = new JSONJAXBContext(klass);
|
||||
JSONMarshaller jm = ctx.createJSONMarshaller();
|
||||
jm.marshallToJSON(nsli, sw);
|
||||
return sw.toString();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue