YARN-9864. Format CS Configuration present in Configuration Store. Contributeed by Prabhu Joseph
This commit is contained in:
parent
6529a30d9e
commit
71792f2122
|
@ -64,6 +64,16 @@
|
||||||
<groupId>org.apache.hadoop</groupId>
|
<groupId>org.apache.hadoop</groupId>
|
||||||
<artifactId>hadoop-annotations</artifactId>
|
<artifactId>hadoop-annotations</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.sun.jersey.jersey-test-framework</groupId>
|
||||||
|
<artifactId>jersey-test-framework-core</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.sun.jersey.jersey-test-framework</groupId>
|
||||||
|
<artifactId>jersey-test-framework-grizzly2</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.mockito</groupId>
|
<groupId>org.mockito</groupId>
|
||||||
<artifactId>mockito-all</artifactId>
|
<artifactId>mockito-all</artifactId>
|
||||||
|
|
|
@ -25,6 +25,7 @@ import com.sun.jersey.api.client.ClientRequest;
|
||||||
import com.sun.jersey.api.client.ClientResponse;
|
import com.sun.jersey.api.client.ClientResponse;
|
||||||
import com.sun.jersey.api.client.UniformInterfaceException;
|
import com.sun.jersey.api.client.UniformInterfaceException;
|
||||||
import com.sun.jersey.api.client.WebResource;
|
import com.sun.jersey.api.client.WebResource;
|
||||||
|
import com.sun.jersey.api.client.WebResource.Builder;
|
||||||
import com.sun.jersey.api.client.filter.ClientFilter;
|
import com.sun.jersey.api.client.filter.ClientFilter;
|
||||||
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
|
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
|
||||||
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
|
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
|
||||||
|
@ -155,6 +156,9 @@ public class LogsCLI extends Configured implements Tool {
|
||||||
if (yarnClient != null) {
|
if (yarnClient != null) {
|
||||||
yarnClient.close();
|
yarnClient.close();
|
||||||
}
|
}
|
||||||
|
if (webServiceClient != null) {
|
||||||
|
webServiceClient.destroy();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -418,24 +422,34 @@ public class LogsCLI extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected List<JSONObject> getAMContainerInfoForRMWebService(
|
protected List<JSONObject> getAMContainerInfoForRMWebService(
|
||||||
Configuration conf, String appId) throws ClientHandlerException,
|
Configuration conf, String appId) throws Exception {
|
||||||
|
return WebAppUtils.execOnActiveRM(conf, this::getAMContainerInfoFromRM,
|
||||||
|
appId);
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<JSONObject> getAMContainerInfoFromRM(
|
||||||
|
String webAppAddress, String appId) throws ClientHandlerException,
|
||||||
UniformInterfaceException, JSONException {
|
UniformInterfaceException, JSONException {
|
||||||
String webAppAddress = WebAppUtils.getRMWebAppURLWithScheme(conf);
|
|
||||||
|
|
||||||
WebResource webResource = webServiceClient.resource(webAppAddress);
|
|
||||||
|
|
||||||
ClientResponse response =
|
|
||||||
webResource.path("ws").path("v1").path("cluster").path("apps")
|
|
||||||
.path(appId).path("appattempts").accept(MediaType.APPLICATION_JSON)
|
|
||||||
.get(ClientResponse.class);
|
|
||||||
JSONObject json =
|
|
||||||
response.getEntity(JSONObject.class).getJSONObject("appAttempts");
|
|
||||||
JSONArray requests = json.getJSONArray("appAttempt");
|
|
||||||
List<JSONObject> amContainersList = new ArrayList<JSONObject>();
|
List<JSONObject> amContainersList = new ArrayList<JSONObject>();
|
||||||
for (int i = 0; i < requests.length(); i++) {
|
ClientResponse response = null;
|
||||||
amContainersList.add(requests.getJSONObject(i));
|
try {
|
||||||
|
Builder builder = webServiceClient.resource(webAppAddress)
|
||||||
|
.path("ws").path("v1").path("cluster")
|
||||||
|
.path("apps").path(appId).path("appattempts")
|
||||||
|
.accept(MediaType.APPLICATION_JSON);
|
||||||
|
response = builder.get(ClientResponse.class);
|
||||||
|
JSONObject json = response.getEntity(JSONObject.class)
|
||||||
|
.getJSONObject("appAttempts");
|
||||||
|
JSONArray requests = json.getJSONArray("appAttempt");
|
||||||
|
for (int j = 0; j < requests.length(); j++) {
|
||||||
|
amContainersList.add(requests.getJSONObject(j));
|
||||||
}
|
}
|
||||||
return amContainersList;
|
return amContainersList;
|
||||||
|
} finally {
|
||||||
|
if (response != null) {
|
||||||
|
response.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<JSONObject> getAMContainerInfoForAHSWebService(
|
private List<JSONObject> getAMContainerInfoForAHSWebService(
|
||||||
|
|
|
@ -22,13 +22,16 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.sun.jersey.api.client.Client;
|
import com.sun.jersey.api.client.Client;
|
||||||
import com.sun.jersey.api.client.ClientResponse;
|
import com.sun.jersey.api.client.ClientResponse;
|
||||||
import com.sun.jersey.api.client.WebResource;
|
import com.sun.jersey.api.client.WebResource;
|
||||||
|
import com.sun.jersey.api.client.WebResource.Builder;
|
||||||
import org.apache.commons.cli.CommandLine;
|
import org.apache.commons.cli.CommandLine;
|
||||||
import org.apache.commons.cli.GnuParser;
|
import org.apache.commons.cli.GnuParser;
|
||||||
import org.apache.commons.cli.MissingArgumentException;
|
import org.apache.commons.cli.MissingArgumentException;
|
||||||
import org.apache.commons.cli.Options;
|
import org.apache.commons.cli.Options;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.conf.Configured;
|
import org.apache.hadoop.conf.Configured;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.util.Tool;
|
import org.apache.hadoop.util.Tool;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
|
import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
|
||||||
|
@ -55,6 +58,7 @@ public class SchedConfCLI extends Configured implements Tool {
|
||||||
private static final String REMOVE_QUEUES_OPTION = "removeQueues";
|
private static final String REMOVE_QUEUES_OPTION = "removeQueues";
|
||||||
private static final String UPDATE_QUEUES_OPTION = "updateQueues";
|
private static final String UPDATE_QUEUES_OPTION = "updateQueues";
|
||||||
private static final String GLOBAL_OPTIONS = "globalUpdates";
|
private static final String GLOBAL_OPTIONS = "globalUpdates";
|
||||||
|
private static final String FORMAT_CONF = "formatConfig";
|
||||||
private static final String HELP_CMD = "help";
|
private static final String HELP_CMD = "help";
|
||||||
|
|
||||||
private static final String CONF_ERR_MSG = "Specify configuration key " +
|
private static final String CONF_ERR_MSG = "Specify configuration key " +
|
||||||
|
@ -82,6 +86,9 @@ public class SchedConfCLI extends Configured implements Tool {
|
||||||
"Update queue configurations");
|
"Update queue configurations");
|
||||||
opts.addOption("global", GLOBAL_OPTIONS, true,
|
opts.addOption("global", GLOBAL_OPTIONS, true,
|
||||||
"Update global scheduler configurations");
|
"Update global scheduler configurations");
|
||||||
|
opts.addOption("format", FORMAT_CONF, false,
|
||||||
|
"Format Scheduler Configuration and reload from" +
|
||||||
|
" capacity-scheduler.xml");
|
||||||
opts.addOption("h", HELP_CMD, false, "Displays help for all commands.");
|
opts.addOption("h", HELP_CMD, false, "Displays help for all commands.");
|
||||||
|
|
||||||
int exitCode = -1;
|
int exitCode = -1;
|
||||||
|
@ -100,6 +107,7 @@ public class SchedConfCLI extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean hasOption = false;
|
boolean hasOption = false;
|
||||||
|
boolean format = false;
|
||||||
SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
|
SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
|
||||||
try {
|
try {
|
||||||
if (parsedCli.hasOption(ADD_QUEUES_OPTION)) {
|
if (parsedCli.hasOption(ADD_QUEUES_OPTION)) {
|
||||||
|
@ -120,6 +128,11 @@ public class SchedConfCLI extends Configured implements Tool {
|
||||||
hasOption = true;
|
hasOption = true;
|
||||||
globalUpdates(parsedCli.getOptionValue(GLOBAL_OPTIONS), updateInfo);
|
globalUpdates(parsedCli.getOptionValue(GLOBAL_OPTIONS), updateInfo);
|
||||||
}
|
}
|
||||||
|
if (parsedCli.hasOption((FORMAT_CONF))) {
|
||||||
|
hasOption = true;
|
||||||
|
format = true;
|
||||||
|
}
|
||||||
|
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
System.err.println(e.getMessage());
|
System.err.println(e.getMessage());
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -131,14 +144,82 @@ public class SchedConfCLI extends Configured implements Tool {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Configuration conf = getConf();
|
||||||
|
if (format) {
|
||||||
|
return WebAppUtils.execOnActiveRM(conf, this::formatSchedulerConf, null);
|
||||||
|
} else {
|
||||||
|
return WebAppUtils.execOnActiveRM(conf,
|
||||||
|
this::updateSchedulerConfOnRMNode, updateInfo);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
int formatSchedulerConf(String webAppAddress, WebResource resource)
|
||||||
|
throws Exception {
|
||||||
Client webServiceClient = Client.create();
|
Client webServiceClient = Client.create();
|
||||||
WebResource webResource = webServiceClient.resource(WebAppUtils.
|
ClientResponse response = null;
|
||||||
getRMWebAppURLWithScheme(getConf()));
|
resource = (resource != null) ? resource :
|
||||||
ClientResponse response = webResource.path("ws").path("v1").path("cluster")
|
webServiceClient.resource(webAppAddress);
|
||||||
.path("scheduler-conf").accept(MediaType.APPLICATION_JSON)
|
|
||||||
.entity(YarnWebServiceUtils.toJson(updateInfo,
|
try {
|
||||||
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
|
Builder builder = null;
|
||||||
.put(ClientResponse.class);
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
|
builder = resource
|
||||||
|
.path("ws").path("v1").path("cluster")
|
||||||
|
.path("/scheduler-conf/format")
|
||||||
|
.accept(MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON);
|
||||||
|
} else {
|
||||||
|
builder = resource
|
||||||
|
.path("ws").path("v1").path("cluster")
|
||||||
|
.path("/scheduler-conf/format").queryParam("user.name",
|
||||||
|
UserGroupInformation.getCurrentUser().getShortUserName())
|
||||||
|
.accept(MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON);
|
||||||
|
}
|
||||||
|
|
||||||
|
response = builder.get(ClientResponse.class);
|
||||||
|
if (response != null) {
|
||||||
|
if (response.getStatus() == Status.OK.getStatusCode()) {
|
||||||
|
System.out.println(response.getEntity(String.class));
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
System.err.println("Failed to format scheduler configuration: " +
|
||||||
|
response.getEntity(String.class));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
System.err.println("Failed to format scheduler configuration: " +
|
||||||
|
"null response");
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
} finally {
|
||||||
|
if (response != null) {
|
||||||
|
response.close();
|
||||||
|
}
|
||||||
|
webServiceClient.destroy();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
int updateSchedulerConfOnRMNode(String webAppAddress,
|
||||||
|
SchedConfUpdateInfo updateInfo) throws Exception {
|
||||||
|
Client webServiceClient = Client.create();
|
||||||
|
ClientResponse response = null;
|
||||||
|
WebResource resource = webServiceClient.resource(webAppAddress);
|
||||||
|
|
||||||
|
try {
|
||||||
|
Builder builder = null;
|
||||||
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
|
builder = resource.path("ws").path("v1").path("cluster")
|
||||||
|
.path("scheduler-conf").accept(MediaType.APPLICATION_JSON);
|
||||||
|
} else {
|
||||||
|
builder = resource.path("ws").path("v1").path("cluster")
|
||||||
|
.queryParam("user.name",
|
||||||
|
UserGroupInformation.getCurrentUser().getShortUserName())
|
||||||
|
.path("scheduler-conf").accept(MediaType.APPLICATION_JSON);
|
||||||
|
}
|
||||||
|
|
||||||
|
builder.entity(YarnWebServiceUtils.toJson(updateInfo,
|
||||||
|
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON);
|
||||||
|
response = builder.put(ClientResponse.class);
|
||||||
if (response != null) {
|
if (response != null) {
|
||||||
if (response.getStatus() == Status.OK.getStatusCode()) {
|
if (response.getStatus() == Status.OK.getStatusCode()) {
|
||||||
System.out.println("Configuration changed successfully.");
|
System.out.println("Configuration changed successfully.");
|
||||||
|
@ -151,7 +232,14 @@ public class SchedConfCLI extends Configured implements Tool {
|
||||||
System.err.println("Configuration change unsuccessful: null response");
|
System.err.println("Configuration change unsuccessful: null response");
|
||||||
}
|
}
|
||||||
return -1;
|
return -1;
|
||||||
|
} finally {
|
||||||
|
if (response != null) {
|
||||||
|
response.close();
|
||||||
}
|
}
|
||||||
|
webServiceClient.destroy();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
void addQueues(String args, SchedConfUpdateInfo updateInfo) {
|
void addQueues(String args, SchedConfUpdateInfo updateInfo) {
|
||||||
|
@ -237,7 +325,8 @@ public class SchedConfCLI extends Configured implements Tool {
|
||||||
+ "[-remove \"queueRemovePath1;queueRemovePath2\"] "
|
+ "[-remove \"queueRemovePath1;queueRemovePath2\"] "
|
||||||
+ "[-update \"queueUpdatePath1:confKey1=confVal1\"] "
|
+ "[-update \"queueUpdatePath1:confKey1=confVal1\"] "
|
||||||
+ "[-global globalConfKey1=globalConfVal1,"
|
+ "[-global globalConfKey1=globalConfVal1,"
|
||||||
+ "globalConfKey2=globalConfVal2]\n"
|
+ "globalConfKey2=globalConfVal2] "
|
||||||
|
+ "[-format]\n"
|
||||||
+ "Example (adding queues): yarn schedulerconf -add "
|
+ "Example (adding queues): yarn schedulerconf -add "
|
||||||
+ "\"root.a.a1:capacity=100,maximum-capacity=100;root.a.a2:capacity=0,"
|
+ "\"root.a.a1:capacity=100,maximum-capacity=100;root.a.a2:capacity=0,"
|
||||||
+ "maximum-capacity=0\"\n"
|
+ "maximum-capacity=0\"\n"
|
||||||
|
@ -248,6 +337,8 @@ public class SchedConfCLI extends Configured implements Tool {
|
||||||
+ "maximum-capacity=75\"\n"
|
+ "maximum-capacity=75\"\n"
|
||||||
+ "Example (global scheduler update): yarn schedulerconf "
|
+ "Example (global scheduler update): yarn schedulerconf "
|
||||||
+ "-global yarn.scheduler.capacity.maximum-applications=10000\n"
|
+ "-global yarn.scheduler.capacity.maximum-applications=10000\n"
|
||||||
|
+ "Example (format scheduler configuration): yarn schedulerconf "
|
||||||
|
+ "-format\n"
|
||||||
+ "Note: This is an alpha feature, the syntax/options are subject to "
|
+ "Note: This is an alpha feature, the syntax/options are subject to "
|
||||||
+ "change, please run at your own risk.");
|
+ "change, please run at your own risk.");
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,12 +22,48 @@ import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
|
import java.security.Principal;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import com.google.inject.Guice;
|
||||||
|
import com.google.inject.Singleton;
|
||||||
|
import com.google.inject.servlet.ServletModule;
|
||||||
|
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
|
||||||
|
import com.sun.jersey.test.framework.WebAppDescriptor;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.JAXBContextResolver;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServices;
|
||||||
|
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
|
||||||
|
import org.apache.hadoop.yarn.webapp.GuiceServletConfig;
|
||||||
|
import org.apache.hadoop.yarn.webapp.JerseyTestBase;
|
||||||
import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
|
import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
|
||||||
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
|
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
|
||||||
|
|
||||||
|
import javax.servlet.FilterChain;
|
||||||
|
import javax.servlet.FilterConfig;
|
||||||
|
import javax.servlet.ServletException;
|
||||||
|
import javax.servlet.ServletResponse;
|
||||||
|
import javax.servlet.ServletRequest;
|
||||||
|
import javax.servlet.http.HttpServletRequest;
|
||||||
|
import javax.servlet.http.HttpServletRequestWrapper;
|
||||||
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
@ -35,7 +71,7 @@ import static org.junit.Assert.assertTrue;
|
||||||
/**
|
/**
|
||||||
* Class for testing {@link SchedConfCLI}.
|
* Class for testing {@link SchedConfCLI}.
|
||||||
*/
|
*/
|
||||||
public class TestSchedConfCLI {
|
public class TestSchedConfCLI extends JerseyTestBase {
|
||||||
|
|
||||||
private ByteArrayOutputStream sysOutStream;
|
private ByteArrayOutputStream sysOutStream;
|
||||||
private PrintStream sysOut;
|
private PrintStream sysOut;
|
||||||
|
@ -45,6 +81,23 @@ public class TestSchedConfCLI {
|
||||||
|
|
||||||
private SchedConfCLI cli;
|
private SchedConfCLI cli;
|
||||||
|
|
||||||
|
private static MockRM rm;
|
||||||
|
private static String userName;
|
||||||
|
private static CapacitySchedulerConfiguration csConf;
|
||||||
|
|
||||||
|
private static final File CONF_FILE = new File(new File("target",
|
||||||
|
"test-classes"), YarnConfiguration.CS_CONFIGURATION_FILE);
|
||||||
|
private static final File OLD_CONF_FILE = new File(new File("target",
|
||||||
|
"test-classes"), YarnConfiguration.CS_CONFIGURATION_FILE + ".tmp");
|
||||||
|
|
||||||
|
public TestSchedConfCLI() {
|
||||||
|
super(new WebAppDescriptor.Builder(
|
||||||
|
"org.apache.hadoop.yarn.server.resourcemanager.webapp")
|
||||||
|
.contextListenerClass(GuiceServletConfig.class)
|
||||||
|
.filterClass(com.google.inject.servlet.GuiceFilter.class)
|
||||||
|
.contextPath("jersey-guice-filter").servletPath("/").build());
|
||||||
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() {
|
public void setUp() {
|
||||||
sysOutStream = new ByteArrayOutputStream();
|
sysOutStream = new ByteArrayOutputStream();
|
||||||
|
@ -58,6 +111,138 @@ public class TestSchedConfCLI {
|
||||||
cli = new SchedConfCLI();
|
cli = new SchedConfCLI();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class WebServletModule extends ServletModule {
|
||||||
|
@Override
|
||||||
|
protected void configureServlets() {
|
||||||
|
bind(JAXBContextResolver.class);
|
||||||
|
bind(RMWebServices.class);
|
||||||
|
bind(GenericExceptionHandler.class);
|
||||||
|
Configuration conf = new YarnConfiguration();
|
||||||
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||||
|
ResourceScheduler.class);
|
||||||
|
conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
|
||||||
|
YarnConfiguration.MEMORY_CONFIGURATION_STORE);
|
||||||
|
|
||||||
|
try {
|
||||||
|
userName = UserGroupInformation.getCurrentUser().getShortUserName();
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
throw new RuntimeException("Unable to get current user name "
|
||||||
|
+ ioe.getMessage(), ioe);
|
||||||
|
}
|
||||||
|
|
||||||
|
csConf = new CapacitySchedulerConfiguration(new Configuration(false),
|
||||||
|
false);
|
||||||
|
setupQueueConfiguration(csConf);
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (CONF_FILE.exists()) {
|
||||||
|
if (!CONF_FILE.renameTo(OLD_CONF_FILE)) {
|
||||||
|
throw new RuntimeException("Failed to rename conf file");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
FileOutputStream out = new FileOutputStream(CONF_FILE);
|
||||||
|
csConf.writeXml(out);
|
||||||
|
out.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException("Failed to write XML file", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
rm = new MockRM(conf);
|
||||||
|
bind(ResourceManager.class).toInstance(rm);
|
||||||
|
serve("/*").with(GuiceContainer.class);
|
||||||
|
filter("/*").through(TestRMCustomAuthFilter.class);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Custom filter which sets the Remote User for testing purpose.
|
||||||
|
*/
|
||||||
|
@Singleton
|
||||||
|
public static class TestRMCustomAuthFilter extends AuthenticationFilter {
|
||||||
|
@Override
|
||||||
|
public void init(FilterConfig filterConfig) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void doFilter(ServletRequest request, ServletResponse response,
|
||||||
|
FilterChain filterChain) throws IOException, ServletException {
|
||||||
|
HttpServletRequest httpRequest = (HttpServletRequest)request;
|
||||||
|
HttpServletResponse httpResponse = (HttpServletResponse) response;
|
||||||
|
httpRequest = new HttpServletRequestWrapper(httpRequest) {
|
||||||
|
public String getAuthType() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getRemoteUser() {
|
||||||
|
return userName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Principal getUserPrincipal() {
|
||||||
|
return new Principal() {
|
||||||
|
@Override
|
||||||
|
public String getName() {
|
||||||
|
return userName;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
doFilter(filterChain, httpRequest, httpResponse);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void setupQueueConfiguration(
|
||||||
|
CapacitySchedulerConfiguration config) {
|
||||||
|
config.setQueues(CapacitySchedulerConfiguration.ROOT,
|
||||||
|
new String[]{"testqueue"});
|
||||||
|
String a = CapacitySchedulerConfiguration.ROOT + ".testqueue";
|
||||||
|
config.setCapacity(a, 100f);
|
||||||
|
config.setMaximumCapacity(a, 100f);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 100000)
|
||||||
|
public void testFormatSchedulerConf() throws Exception {
|
||||||
|
try {
|
||||||
|
super.setUp();
|
||||||
|
GuiceServletConfig.setInjector(
|
||||||
|
Guice.createInjector(new WebServletModule()));
|
||||||
|
ResourceScheduler scheduler = rm.getResourceScheduler();
|
||||||
|
MutableConfigurationProvider provider =
|
||||||
|
((MutableConfScheduler) scheduler).getMutableConfProvider();
|
||||||
|
|
||||||
|
SchedConfUpdateInfo schedUpdateInfo = new SchedConfUpdateInfo();
|
||||||
|
HashMap<String, String> globalUpdates = new HashMap<>();
|
||||||
|
globalUpdates.put("schedKey1", "schedVal1");
|
||||||
|
schedUpdateInfo.setGlobalParams(globalUpdates);
|
||||||
|
|
||||||
|
provider.logAndApplyMutation(UserGroupInformation.getCurrentUser(),
|
||||||
|
schedUpdateInfo);
|
||||||
|
rm.getRMContext().getRMAdminService().refreshQueues();
|
||||||
|
provider.confirmPendingMutation(true);
|
||||||
|
|
||||||
|
Configuration schedulerConf = provider.getConfiguration();
|
||||||
|
assertEquals("schedVal1", schedulerConf.get("schedKey1"));
|
||||||
|
|
||||||
|
int exitCode = cli.formatSchedulerConf("", resource());
|
||||||
|
assertEquals(0, exitCode);
|
||||||
|
|
||||||
|
schedulerConf = provider.getConfiguration();
|
||||||
|
assertNull(schedulerConf.get("schedKey1"));
|
||||||
|
} finally {
|
||||||
|
if (rm != null) {
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
|
CONF_FILE.delete();
|
||||||
|
if (OLD_CONF_FILE.exists()) {
|
||||||
|
if (!OLD_CONF_FILE.renameTo(CONF_FILE)) {
|
||||||
|
throw new RuntimeException("Failed to re-copy old" +
|
||||||
|
" configuration file");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
super.tearDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 10000)
|
@Test(timeout = 10000)
|
||||||
public void testInvalidConf() throws Exception {
|
public void testInvalidConf() throws Exception {
|
||||||
// conf pair with no key should be invalid
|
// conf pair with no key should be invalid
|
||||||
|
|
|
@ -90,8 +90,41 @@ public class WebAppUtils {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Runs a certain function against the active RM. The function's first
|
||||||
|
* argument is expected to be a string which contains the address of
|
||||||
|
* the RM being tried.
|
||||||
|
*/
|
||||||
|
public static <T, R> R execOnActiveRM(Configuration conf,
|
||||||
|
ThrowingBiFunction<String, T, R> func, T arg) throws Exception {
|
||||||
|
String rm1Address = getRMWebAppURLWithScheme(conf, 0);
|
||||||
|
try {
|
||||||
|
return func.apply(rm1Address, arg);
|
||||||
|
} catch (Exception e) {
|
||||||
|
if (HAUtil.isHAEnabled(conf)) {
|
||||||
|
int rms = HAUtil.getRMHAIds(conf).size();
|
||||||
|
for (int i=1; i<rms; i++) {
|
||||||
|
try {
|
||||||
|
rm1Address = getRMWebAppURLWithScheme(conf, i);
|
||||||
|
return func.apply(rm1Address, arg);
|
||||||
|
} catch (Exception e1) {
|
||||||
|
// ignore and try next one when RM is down
|
||||||
|
e = e1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** A BiFunction which throws on Exception. */
|
||||||
|
@FunctionalInterface
|
||||||
|
public interface ThrowingBiFunction<T, U, R> {
|
||||||
|
R apply(T t, U u) throws Exception;
|
||||||
|
}
|
||||||
|
|
||||||
public static String getRMWebAppURLWithoutScheme(Configuration conf,
|
public static String getRMWebAppURLWithoutScheme(Configuration conf,
|
||||||
boolean isHAEnabled) {
|
boolean isHAEnabled, int haIdIndex) {
|
||||||
YarnConfiguration yarnConfig = new YarnConfiguration(conf);
|
YarnConfiguration yarnConfig = new YarnConfiguration(conf);
|
||||||
// set RM_ID if we have not configure it.
|
// set RM_ID if we have not configure it.
|
||||||
if (isHAEnabled) {
|
if (isHAEnabled) {
|
||||||
|
@ -99,7 +132,7 @@ public class WebAppUtils {
|
||||||
if (rmId == null || rmId.isEmpty()) {
|
if (rmId == null || rmId.isEmpty()) {
|
||||||
List<String> rmIds = new ArrayList<>(HAUtil.getRMHAIds(conf));
|
List<String> rmIds = new ArrayList<>(HAUtil.getRMHAIds(conf));
|
||||||
if (rmIds != null && !rmIds.isEmpty()) {
|
if (rmIds != null && !rmIds.isEmpty()) {
|
||||||
yarnConfig.set(YarnConfiguration.RM_HA_ID, rmIds.get(0));
|
yarnConfig.set(YarnConfiguration.RM_HA_ID, rmIds.get(haIdIndex));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -120,13 +153,19 @@ public class WebAppUtils {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static String getRMWebAppURLWithScheme(Configuration conf,
|
||||||
|
int haIdIndex) {
|
||||||
|
return getHttpSchemePrefix(conf) + getRMWebAppURLWithoutScheme(
|
||||||
|
conf, HAUtil.isHAEnabled(conf), haIdIndex);
|
||||||
|
}
|
||||||
|
|
||||||
public static String getRMWebAppURLWithScheme(Configuration conf) {
|
public static String getRMWebAppURLWithScheme(Configuration conf) {
|
||||||
return getHttpSchemePrefix(conf) + getRMWebAppURLWithoutScheme(
|
return getHttpSchemePrefix(conf) + getRMWebAppURLWithoutScheme(
|
||||||
conf, HAUtil.isHAEnabled(conf));
|
conf, HAUtil.isHAEnabled(conf), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String getRMWebAppURLWithoutScheme(Configuration conf) {
|
public static String getRMWebAppURLWithoutScheme(Configuration conf) {
|
||||||
return getRMWebAppURLWithoutScheme(conf, false);
|
return getRMWebAppURLWithoutScheme(conf, false, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String getRouterWebAppURLWithScheme(Configuration conf) {
|
public static String getRouterWebAppURLWithScheme(Configuration conf) {
|
||||||
|
|
|
@ -21,7 +21,7 @@ import com.sun.jersey.api.client.Client;
|
||||||
import com.sun.jersey.api.client.ClientHandlerException;
|
import com.sun.jersey.api.client.ClientHandlerException;
|
||||||
import com.sun.jersey.api.client.ClientResponse;
|
import com.sun.jersey.api.client.ClientResponse;
|
||||||
import com.sun.jersey.api.client.UniformInterfaceException;
|
import com.sun.jersey.api.client.UniformInterfaceException;
|
||||||
import com.sun.jersey.api.client.WebResource;
|
import com.sun.jersey.api.client.WebResource.Builder;
|
||||||
import javax.ws.rs.core.MediaType;
|
import javax.ws.rs.core.MediaType;
|
||||||
|
|
||||||
import com.sun.jersey.api.json.JSONJAXBContext;
|
import com.sun.jersey.api.json.JSONJAXBContext;
|
||||||
|
@ -53,16 +53,36 @@ public final class YarnWebServiceUtils {
|
||||||
public static JSONObject getNodeInfoFromRMWebService(Configuration conf,
|
public static JSONObject getNodeInfoFromRMWebService(Configuration conf,
|
||||||
String nodeId) throws ClientHandlerException,
|
String nodeId) throws ClientHandlerException,
|
||||||
UniformInterfaceException {
|
UniformInterfaceException {
|
||||||
|
try {
|
||||||
|
return WebAppUtils.execOnActiveRM(conf,
|
||||||
|
YarnWebServiceUtils::getNodeInfoFromRM, nodeId);
|
||||||
|
} catch (Exception e) {
|
||||||
|
if (e instanceof ClientHandlerException) {
|
||||||
|
throw ((ClientHandlerException) e);
|
||||||
|
} else if (e instanceof UniformInterfaceException) {
|
||||||
|
throw ((UniformInterfaceException) e);
|
||||||
|
} else {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static JSONObject getNodeInfoFromRM(String webAppAddress,
|
||||||
|
String nodeId) throws ClientHandlerException, UniformInterfaceException {
|
||||||
Client webServiceClient = Client.create();
|
Client webServiceClient = Client.create();
|
||||||
String webAppAddress = WebAppUtils.getRMWebAppURLWithScheme(conf);
|
ClientResponse response = null;
|
||||||
|
try {
|
||||||
WebResource webResource = webServiceClient.resource(webAppAddress);
|
Builder builder = webServiceClient.resource(webAppAddress)
|
||||||
|
.path("ws").path("v1").path("cluster")
|
||||||
ClientResponse response = webResource.path("ws").path("v1")
|
.path("nodes").path(nodeId).accept(MediaType.APPLICATION_JSON);
|
||||||
.path("cluster").path("nodes")
|
response = builder.get(ClientResponse.class);
|
||||||
.path(nodeId).accept(MediaType.APPLICATION_JSON)
|
|
||||||
.get(ClientResponse.class);
|
|
||||||
return response.getEntity(JSONObject.class);
|
return response.getEntity(JSONObject.class);
|
||||||
|
} finally {
|
||||||
|
if (response != null) {
|
||||||
|
response.close();
|
||||||
|
}
|
||||||
|
webServiceClient.destroy();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("rawtypes")
|
@SuppressWarnings("rawtypes")
|
||||||
|
|
|
@ -56,6 +56,12 @@ public class TestYarnConfiguration {
|
||||||
conf2.set("yarn.resourcemanager.hostname.rm2", "40.40.40.40");
|
conf2.set("yarn.resourcemanager.hostname.rm2", "40.40.40.40");
|
||||||
String rmWebUrlinHA2 = WebAppUtils.getRMWebAppURLWithScheme(conf2);
|
String rmWebUrlinHA2 = WebAppUtils.getRMWebAppURLWithScheme(conf2);
|
||||||
Assert.assertEquals("http://30.30.30.30:8088", rmWebUrlinHA2);
|
Assert.assertEquals("http://30.30.30.30:8088", rmWebUrlinHA2);
|
||||||
|
|
||||||
|
rmWebUrlinHA2 = WebAppUtils.getRMWebAppURLWithScheme(conf2, 0);
|
||||||
|
Assert.assertEquals("http://30.30.30.30:8088", rmWebUrlinHA2);
|
||||||
|
|
||||||
|
rmWebUrlinHA2 = WebAppUtils.getRMWebAppURLWithScheme(conf2, 1);
|
||||||
|
Assert.assertEquals("http://40.40.40.40:8088", rmWebUrlinHA2);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -65,6 +65,8 @@ public interface MutableConfigurationProvider {
|
||||||
*/
|
*/
|
||||||
Configuration getConfiguration();
|
Configuration getConfiguration();
|
||||||
|
|
||||||
|
void formatConfigurationInStore(Configuration conf) throws Exception;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Closes the configuration provider, releasing any required resources.
|
* Closes the configuration provider, releasing any required resources.
|
||||||
* @throws IOException on failure to close
|
* @throws IOException on failure to close
|
||||||
|
|
|
@ -60,6 +60,11 @@ public class InMemoryConfigurationStore extends YarnConfigurationStore {
|
||||||
pendingMutation = null;
|
pendingMutation = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void format() {
|
||||||
|
this.schedConf = null;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized Configuration retrieve() {
|
public synchronized Configuration retrieve() {
|
||||||
return schedConf;
|
return schedConf;
|
||||||
|
|
|
@ -98,6 +98,13 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void format() throws Exception {
|
||||||
|
close();
|
||||||
|
FileSystem fs = FileSystem.getLocal(conf);
|
||||||
|
fs.delete(getStorageDir(), true);
|
||||||
|
}
|
||||||
|
|
||||||
private void initDatabase(Configuration config) throws Exception {
|
private void initDatabase(Configuration config) throws Exception {
|
||||||
Path storeRoot = createStorageDir();
|
Path storeRoot = createStorageDir();
|
||||||
Options options = new Options();
|
Options options = new Options();
|
||||||
|
|
|
@ -40,6 +40,7 @@ import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* CS configuration provider which implements
|
* CS configuration provider which implements
|
||||||
|
@ -58,6 +59,9 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
|
||||||
private ConfigurationMutationACLPolicy aclMutationPolicy;
|
private ConfigurationMutationACLPolicy aclMutationPolicy;
|
||||||
private RMContext rmContext;
|
private RMContext rmContext;
|
||||||
|
|
||||||
|
private final ReentrantReadWriteLock formatLock =
|
||||||
|
new ReentrantReadWriteLock();
|
||||||
|
|
||||||
public MutableCSConfigurationProvider(RMContext rmContext) {
|
public MutableCSConfigurationProvider(RMContext rmContext) {
|
||||||
this.rmContext = rmContext;
|
this.rmContext = rmContext;
|
||||||
}
|
}
|
||||||
|
@ -148,17 +152,51 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void formatConfigurationInStore(Configuration config)
|
||||||
|
throws Exception {
|
||||||
|
formatLock.writeLock().lock();
|
||||||
|
try {
|
||||||
|
confStore.format();
|
||||||
|
Configuration initialSchedConf = new Configuration(false);
|
||||||
|
initialSchedConf.addResource(YarnConfiguration.CS_CONFIGURATION_FILE);
|
||||||
|
this.schedConf = new Configuration(false);
|
||||||
|
// We need to explicitly set the key-values in schedConf, otherwise
|
||||||
|
// these configuration keys cannot be deleted when
|
||||||
|
// configuration is reloaded.
|
||||||
|
for (Map.Entry<String, String> kv : initialSchedConf) {
|
||||||
|
schedConf.set(kv.getKey(), kv.getValue());
|
||||||
|
}
|
||||||
|
confStore.initialize(config, schedConf, rmContext);
|
||||||
|
confStore.checkVersion();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
} finally {
|
||||||
|
formatLock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void confirmPendingMutation(boolean isValid) throws Exception {
|
public void confirmPendingMutation(boolean isValid) throws Exception {
|
||||||
|
formatLock.readLock().lock();
|
||||||
|
try {
|
||||||
confStore.confirmMutation(isValid);
|
confStore.confirmMutation(isValid);
|
||||||
if (!isValid) {
|
if (!isValid) {
|
||||||
schedConf = oldConf;
|
schedConf = oldConf;
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
formatLock.readLock().unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void reloadConfigurationFromStore() throws Exception {
|
public void reloadConfigurationFromStore() throws Exception {
|
||||||
|
formatLock.readLock().lock();
|
||||||
|
try {
|
||||||
schedConf = confStore.retrieve();
|
schedConf = confStore.retrieve();
|
||||||
|
} finally {
|
||||||
|
formatLock.readLock().unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<String> getSiblingQueues(String queuePath, Configuration conf) {
|
private List<String> getSiblingQueues(String queuePath, Configuration conf) {
|
||||||
|
|
|
@ -125,6 +125,12 @@ public abstract class YarnConfigurationStore {
|
||||||
*/
|
*/
|
||||||
public abstract Configuration retrieve();
|
public abstract Configuration retrieve();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Format the persisted configuration.
|
||||||
|
* @throws IOException on failure to format
|
||||||
|
*/
|
||||||
|
public abstract void format() throws Exception;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a list of confirmed configuration mutations starting from a given id.
|
* Get a list of confirmed configuration mutations starting from a given id.
|
||||||
* @param fromId id from which to start getting mutations, inclusive
|
* @param fromId id from which to start getting mutations, inclusive
|
||||||
|
|
|
@ -132,6 +132,11 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void format() throws Exception {
|
||||||
|
zkManager.delete(confStorePath);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void storeVersion() throws Exception {
|
public synchronized void storeVersion() throws Exception {
|
||||||
byte[] data =
|
byte[] data =
|
||||||
|
|
|
@ -45,6 +45,12 @@ public final class RMWSConsts {
|
||||||
/** Path for {@code RMWebServiceProtocol#getSchedulerInfo}. */
|
/** Path for {@code RMWebServiceProtocol#getSchedulerInfo}. */
|
||||||
public static final String SCHEDULER = "/scheduler";
|
public static final String SCHEDULER = "/scheduler";
|
||||||
|
|
||||||
|
/** Path for {@code RMWebServices#updateSchedulerConfiguration}. */
|
||||||
|
public static final String SCHEDULER_CONF = "/scheduler-conf";
|
||||||
|
|
||||||
|
/** Path for {@code RMWebServices#formatSchedulerConfiguration}. */
|
||||||
|
public static final String FORMAT_SCHEDULER_CONF = "/scheduler-conf/format";
|
||||||
|
|
||||||
/** Path for {@code RMWebServiceProtocol#dumpSchedulerLogs}. */
|
/** Path for {@code RMWebServiceProtocol#dumpSchedulerLogs}. */
|
||||||
public static final String SCHEDULER_LOGS = "/scheduler/logs";
|
public static final String SCHEDULER_LOGS = "/scheduler/logs";
|
||||||
|
|
||||||
|
|
|
@ -853,7 +853,7 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
|
||||||
}
|
}
|
||||||
|
|
||||||
@GET
|
@GET
|
||||||
@Path("/apps/{appid}/appattempts/{appattemptid}/containers/{containerid}")
|
@Path(RMWSConsts.GET_CONTAINER)
|
||||||
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
||||||
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
|
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
|
||||||
@Override
|
@Override
|
||||||
|
@ -867,7 +867,7 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
|
||||||
}
|
}
|
||||||
|
|
||||||
@GET
|
@GET
|
||||||
@Path("/apps/{appid}/state")
|
@Path(RMWSConsts.APPS_APPID_STATE)
|
||||||
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
||||||
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
|
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
|
||||||
@Override
|
@Override
|
||||||
|
@ -2320,7 +2320,7 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
|
||||||
}
|
}
|
||||||
|
|
||||||
@PUT
|
@PUT
|
||||||
@Path("/scheduler-conf")
|
@Path(RMWSConsts.SCHEDULER_CONF)
|
||||||
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
||||||
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
|
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
|
||||||
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
|
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
|
||||||
|
@ -2372,7 +2372,39 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
|
||||||
}
|
}
|
||||||
|
|
||||||
@GET
|
@GET
|
||||||
@Path("/scheduler-conf")
|
@Path(RMWSConsts.FORMAT_SCHEDULER_CONF)
|
||||||
|
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
||||||
|
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
|
||||||
|
public Response formatSchedulerConfiguration(@Context HttpServletRequest hsr)
|
||||||
|
throws AuthorizationException {
|
||||||
|
// Only admin user allowed to format scheduler conf in configuration store
|
||||||
|
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
|
||||||
|
initForWritableEndpoints(callerUGI, true);
|
||||||
|
|
||||||
|
ResourceScheduler scheduler = rm.getResourceScheduler();
|
||||||
|
if (scheduler instanceof MutableConfScheduler
|
||||||
|
&& ((MutableConfScheduler) scheduler).isConfigurationMutable()) {
|
||||||
|
try {
|
||||||
|
MutableConfigurationProvider mutableConfigurationProvider =
|
||||||
|
((MutableConfScheduler) scheduler).getMutableConfProvider();
|
||||||
|
mutableConfigurationProvider.formatConfigurationInStore(conf);
|
||||||
|
return Response.status(Status.OK).entity("Configuration under " +
|
||||||
|
"store successfully formatted.").build();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Exception thrown when formating configuration", e);
|
||||||
|
return Response.status(Status.BAD_REQUEST).entity(e.getMessage())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return Response.status(Status.BAD_REQUEST)
|
||||||
|
.entity("Configuration change only supported by " +
|
||||||
|
"MutableConfScheduler.").build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@GET
|
||||||
|
@Path(RMWSConsts.SCHEDULER_CONF)
|
||||||
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
||||||
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
|
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
|
||||||
public Response getSchedulerConfiguration(@Context HttpServletRequest hsr)
|
public Response getSchedulerConfiguration(@Context HttpServletRequest hsr)
|
||||||
|
|
|
@ -98,5 +98,9 @@ public class TestMutableCSConfigurationProvider {
|
||||||
confProvider.confirmPendingMutation(false);
|
confProvider.confirmPendingMutation(false);
|
||||||
assertNull(confProvider.loadConfiguration(conf).get(
|
assertNull(confProvider.loadConfiguration(conf).get(
|
||||||
"yarn.scheduler.capacity.root.a.badKey"));
|
"yarn.scheduler.capacity.root.a.badKey"));
|
||||||
|
|
||||||
|
confProvider.formatConfigurationInStore(conf);
|
||||||
|
assertNull(confProvider.loadConfiguration(conf)
|
||||||
|
.get("yarn.scheduler.capacity.root.a.goodKey"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -128,6 +128,15 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFormatConfiguration() throws Exception {
|
||||||
|
schedConf.set("key", "val");
|
||||||
|
confStore.initialize(conf, schedConf, rmContext);
|
||||||
|
assertEquals("val", confStore.retrieve().get("key"));
|
||||||
|
confStore.format();
|
||||||
|
assertNull(confStore.retrieve());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPersistUpdatedConfiguration() throws Exception {
|
public void testPersistUpdatedConfiguration() throws Exception {
|
||||||
confStore.initialize(conf, schedConf, rmContext);
|
confStore.initialize(conf, schedConf, rmContext);
|
||||||
|
|
|
@ -185,6 +185,19 @@ public class TestRMWebServicesConfigurationMutation extends JerseyTestBase {
|
||||||
assertEquals(3, orgConf.getQueues("root").length);
|
assertEquals(3, orgConf.getQueues("root").length);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFormatSchedulerConf() throws Exception {
|
||||||
|
testAddNestedQueue();
|
||||||
|
WebResource r = resource();
|
||||||
|
ClientResponse response = r.path("ws").path("v1").path("cluster")
|
||||||
|
.queryParam("user.name", userName)
|
||||||
|
.path(RMWSConsts.FORMAT_SCHEDULER_CONF)
|
||||||
|
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||||
|
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||||
|
CapacitySchedulerConfiguration orgConf = getSchedulerConf();
|
||||||
|
assertEquals(3, orgConf.getQueues("root").length);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAddNestedQueue() throws Exception {
|
public void testAddNestedQueue() throws Exception {
|
||||||
CapacitySchedulerConfiguration orgConf = getSchedulerConf();
|
CapacitySchedulerConfiguration orgConf = getSchedulerConf();
|
||||||
|
|
Loading…
Reference in New Issue