diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
index c21c3814a1f..5568734c225 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
@@ -65,6 +65,16 @@
mockito-all
test
+
+ com.sun.jersey.jersey-test-framework
+ jersey-test-framework-core
+ test
+
+
+ com.sun.jersey.jersey-test-framework
+ jersey-test-framework-grizzly2
+ test
+
org.apache.hadoop
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/SchedConfCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/SchedConfCLI.java
index be54553a0c8..daf4addd720 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/SchedConfCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/SchedConfCLI.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.client.cli;
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.WebResource.Builder;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
@@ -30,6 +31,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
@@ -56,6 +58,7 @@ public class SchedConfCLI extends Configured implements Tool {
private static final String REMOVE_QUEUES_OPTION = "removeQueues";
private static final String UPDATE_QUEUES_OPTION = "updateQueues";
private static final String GLOBAL_OPTIONS = "globalUpdates";
+ private static final String FORMAT_CONF = "formatConfig";
private static final String HELP_CMD = "help";
private static final String CONF_ERR_MSG = "Specify configuration key " +
@@ -83,6 +86,9 @@ public class SchedConfCLI extends Configured implements Tool {
"Update queue configurations");
opts.addOption("global", GLOBAL_OPTIONS, true,
"Update global scheduler configurations");
+ opts.addOption("format", FORMAT_CONF, false,
+ "Format Scheduler Configuration and reload from" +
+ " capacity-scheduler.xml");
opts.addOption("h", HELP_CMD, false, "Displays help for all commands.");
int exitCode = -1;
@@ -101,6 +107,7 @@ public class SchedConfCLI extends Configured implements Tool {
}
boolean hasOption = false;
+ boolean format = false;
SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
try {
if (parsedCli.hasOption(ADD_QUEUES_OPTION)) {
@@ -121,6 +128,11 @@ public class SchedConfCLI extends Configured implements Tool {
hasOption = true;
globalUpdates(parsedCli.getOptionValue(GLOBAL_OPTIONS), updateInfo);
}
+ if (parsedCli.hasOption((FORMAT_CONF))) {
+ hasOption = true;
+ format = true;
+ }
+
} catch (IllegalArgumentException e) {
System.err.println(e.getMessage());
return -1;
@@ -133,18 +145,78 @@ public class SchedConfCLI extends Configured implements Tool {
}
Configuration conf = getConf();
- return WebAppUtils.execOnActiveRM(conf,
- this::updateSchedulerConfOnRMNode, updateInfo);
+ if (format) {
+ return WebAppUtils.execOnActiveRM(conf, this::formatSchedulerConf, null);
+ } else {
+ return WebAppUtils.execOnActiveRM(conf,
+ this::updateSchedulerConfOnRMNode, updateInfo);
+ }
}
- private int updateSchedulerConfOnRMNode(String webAppAddress,
+ @VisibleForTesting
+ int formatSchedulerConf(String webAppAddress, WebResource resource)
+ throws Exception {
+ Client webServiceClient = Client.create();
+ ClientResponse response = null;
+ resource = (resource != null) ? resource :
+ webServiceClient.resource(webAppAddress);
+
+ try {
+ Builder builder = null;
+ if (UserGroupInformation.isSecurityEnabled()) {
+ builder = resource
+ .path("ws").path("v1").path("cluster")
+ .path("/scheduler-conf/format")
+ .accept(MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON);
+ } else {
+ builder = resource
+ .path("ws").path("v1").path("cluster")
+ .path("/scheduler-conf/format").queryParam("user.name",
+ UserGroupInformation.getCurrentUser().getShortUserName())
+ .accept(MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON);
+ }
+
+ response = builder.get(ClientResponse.class);
+ if (response != null) {
+ if (response.getStatus() == Status.OK.getStatusCode()) {
+ System.out.println(response.getEntity(String.class));
+ return 0;
+ } else {
+ System.err.println("Failed to format scheduler configuration: " +
+ response.getEntity(String.class));
+ }
+ } else {
+ System.err.println("Failed to format scheduler configuration: " +
+ "null response");
+ }
+ return -1;
+ } finally {
+ if (response != null) {
+ response.close();
+ }
+ webServiceClient.destroy();
+ }
+ }
+
+ @VisibleForTesting
+ int updateSchedulerConfOnRMNode(String webAppAddress,
SchedConfUpdateInfo updateInfo) throws Exception {
Client webServiceClient = Client.create();
ClientResponse response = null;
+ WebResource resource = webServiceClient.resource(webAppAddress);
+
try {
- Builder builder = webServiceClient.resource(webAppAddress)
- .path("ws").path("v1").path("cluster")
- .path("scheduler-conf").accept(MediaType.APPLICATION_JSON);
+ Builder builder = null;
+ if (UserGroupInformation.isSecurityEnabled()) {
+ builder = resource.path("ws").path("v1").path("cluster")
+ .path("scheduler-conf").accept(MediaType.APPLICATION_JSON);
+ } else {
+ builder = resource.path("ws").path("v1").path("cluster")
+ .queryParam("user.name",
+ UserGroupInformation.getCurrentUser().getShortUserName())
+ .path("scheduler-conf").accept(MediaType.APPLICATION_JSON);
+ }
+
builder.entity(YarnWebServiceUtils.toJson(updateInfo,
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON);
response = builder.put(ClientResponse.class);
@@ -253,7 +325,8 @@ public class SchedConfCLI extends Configured implements Tool {
+ "[-remove \"queueRemovePath1;queueRemovePath2\"] "
+ "[-update \"queueUpdatePath1:confKey1=confVal1\"] "
+ "[-global globalConfKey1=globalConfVal1,"
- + "globalConfKey2=globalConfVal2]\n"
+ + "globalConfKey2=globalConfVal2] "
+ + "[-format]\n"
+ "Example (adding queues): yarn schedulerconf -add "
+ "\"root.a.a1:capacity=100,maximum-capacity=100;root.a.a2:capacity=0,"
+ "maximum-capacity=0\"\n"
@@ -264,6 +337,8 @@ public class SchedConfCLI extends Configured implements Tool {
+ "maximum-capacity=75\"\n"
+ "Example (global scheduler update): yarn schedulerconf "
+ "-global yarn.scheduler.capacity.maximum-applications=10000\n"
+ + "Example (format scheduler configuration): yarn schedulerconf "
+ + "-format\n"
+ "Note: This is an alpha feature, the syntax/options are subject to "
+ "change, please run at your own risk.");
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestSchedConfCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestSchedConfCLI.java
index 5364e83019c..8a6c9c20d94 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestSchedConfCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestSchedConfCLI.java
@@ -22,12 +22,48 @@ import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
import java.io.PrintStream;
+import java.security.Principal;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+
+import com.google.inject.Guice;
+import com.google.inject.Singleton;
+import com.google.inject.servlet.ServletModule;
+import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
+import com.sun.jersey.test.framework.WebAppDescriptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.JAXBContextResolver;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServices;
+import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
+import org.apache.hadoop.yarn.webapp.GuiceServletConfig;
+import org.apache.hadoop.yarn.webapp.JerseyTestBase;
import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletResponse;
+import javax.servlet.ServletRequest;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletRequestWrapper;
+import javax.servlet.http.HttpServletResponse;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -35,7 +71,7 @@ import static org.junit.Assert.assertTrue;
/**
* Class for testing {@link SchedConfCLI}.
*/
-public class TestSchedConfCLI {
+public class TestSchedConfCLI extends JerseyTestBase {
private ByteArrayOutputStream sysOutStream;
private PrintStream sysOut;
@@ -45,6 +81,23 @@ public class TestSchedConfCLI {
private SchedConfCLI cli;
+ private static MockRM rm;
+ private static String userName;
+ private static CapacitySchedulerConfiguration csConf;
+
+ private static final File CONF_FILE = new File(new File("target",
+ "test-classes"), YarnConfiguration.CS_CONFIGURATION_FILE);
+ private static final File OLD_CONF_FILE = new File(new File("target",
+ "test-classes"), YarnConfiguration.CS_CONFIGURATION_FILE + ".tmp");
+
+ public TestSchedConfCLI() {
+ super(new WebAppDescriptor.Builder(
+ "org.apache.hadoop.yarn.server.resourcemanager.webapp")
+ .contextListenerClass(GuiceServletConfig.class)
+ .filterClass(com.google.inject.servlet.GuiceFilter.class)
+ .contextPath("jersey-guice-filter").servletPath("/").build());
+ }
+
@Before
public void setUp() {
sysOutStream = new ByteArrayOutputStream();
@@ -58,6 +111,138 @@ public class TestSchedConfCLI {
cli = new SchedConfCLI();
}
+ private static class WebServletModule extends ServletModule {
+ @Override
+ protected void configureServlets() {
+ bind(JAXBContextResolver.class);
+ bind(RMWebServices.class);
+ bind(GenericExceptionHandler.class);
+ Configuration conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
+ YarnConfiguration.MEMORY_CONFIGURATION_STORE);
+
+ try {
+ userName = UserGroupInformation.getCurrentUser().getShortUserName();
+ } catch (IOException ioe) {
+ throw new RuntimeException("Unable to get current user name "
+ + ioe.getMessage(), ioe);
+ }
+
+ csConf = new CapacitySchedulerConfiguration(new Configuration(false),
+ false);
+ setupQueueConfiguration(csConf);
+
+ try {
+ if (CONF_FILE.exists()) {
+ if (!CONF_FILE.renameTo(OLD_CONF_FILE)) {
+ throw new RuntimeException("Failed to rename conf file");
+ }
+ }
+ FileOutputStream out = new FileOutputStream(CONF_FILE);
+ csConf.writeXml(out);
+ out.close();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to write XML file", e);
+ }
+
+ rm = new MockRM(conf);
+ bind(ResourceManager.class).toInstance(rm);
+ serve("/*").with(GuiceContainer.class);
+ filter("/*").through(TestRMCustomAuthFilter.class);
+ }
+ }
+
+ /**
+ * Custom filter which sets the Remote User for testing purpose.
+ */
+ @Singleton
+ public static class TestRMCustomAuthFilter extends AuthenticationFilter {
+ @Override
+ public void init(FilterConfig filterConfig) {
+
+ }
+
+ @Override
+ public void doFilter(ServletRequest request, ServletResponse response,
+ FilterChain filterChain) throws IOException, ServletException {
+ HttpServletRequest httpRequest = (HttpServletRequest)request;
+ HttpServletResponse httpResponse = (HttpServletResponse) response;
+ httpRequest = new HttpServletRequestWrapper(httpRequest) {
+ public String getAuthType() {
+ return null;
+ }
+
+ public String getRemoteUser() {
+ return userName;
+ }
+
+ public Principal getUserPrincipal() {
+ return new Principal() {
+ @Override
+ public String getName() {
+ return userName;
+ }
+ };
+ }
+ };
+ doFilter(filterChain, httpRequest, httpResponse);
+ }
+ }
+
+ private static void setupQueueConfiguration(
+ CapacitySchedulerConfiguration config) {
+ config.setQueues(CapacitySchedulerConfiguration.ROOT,
+ new String[]{"testqueue"});
+ String a = CapacitySchedulerConfiguration.ROOT + ".testqueue";
+ config.setCapacity(a, 100f);
+ config.setMaximumCapacity(a, 100f);
+ }
+
+ @Test(timeout = 10000)
+ public void testFormatSchedulerConf() throws Exception {
+ try {
+ super.setUp();
+ GuiceServletConfig.setInjector(
+ Guice.createInjector(new WebServletModule()));
+ ResourceScheduler scheduler = rm.getResourceScheduler();
+ MutableConfigurationProvider provider =
+ ((MutableConfScheduler) scheduler).getMutableConfProvider();
+
+ SchedConfUpdateInfo schedUpdateInfo = new SchedConfUpdateInfo();
+ HashMap globalUpdates = new HashMap<>();
+ globalUpdates.put("schedKey1", "schedVal1");
+ schedUpdateInfo.setGlobalParams(globalUpdates);
+
+ provider.logAndApplyMutation(UserGroupInformation.getCurrentUser(),
+ schedUpdateInfo);
+ rm.getRMContext().getRMAdminService().refreshQueues();
+ provider.confirmPendingMutation(true);
+
+ Configuration schedulerConf = provider.getConfiguration();
+ assertEquals("schedVal1", schedulerConf.get("schedKey1"));
+
+ int exitCode = cli.formatSchedulerConf("", resource());
+ assertEquals(0, exitCode);
+
+ schedulerConf = provider.getConfiguration();
+ assertNull(schedulerConf.get("schedKey1"));
+ } finally {
+ if (rm != null) {
+ rm.stop();
+ }
+ CONF_FILE.delete();
+ if (OLD_CONF_FILE.exists()) {
+ if (!OLD_CONF_FILE.renameTo(CONF_FILE)) {
+ throw new RuntimeException("Failed to re-copy old" +
+ " configuration file");
+ }
+ }
+ super.tearDown();
+ }
+ }
+
@Test(timeout = 10000)
public void testInvalidConf() throws Exception {
// conf pair with no key should be invalid
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
index 6e56f3dd809..9e843dfe89c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
@@ -65,6 +65,8 @@ public interface MutableConfigurationProvider {
*/
Configuration getConfiguration();
+ void formatConfigurationInStore(Configuration conf) throws Exception;
+
/**
* Closes the configuration provider, releasing any required resources.
* @throws IOException on failure to close
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java
index 2a24887f72b..ddc5c8acdd4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java
@@ -156,6 +156,11 @@ public class FSSchedulerConfigurationStore extends YarnConfigurationStore {
+ finalConfigPath);
}
+ @Override
+ public void format() throws Exception {
+ fileSystem.delete(schedulerConfDir, true);
+ }
+
private Path getFinalConfigPath(Path tempPath) {
String tempConfigPathStr = tempPath.getName();
if (!tempConfigPathStr.endsWith(TMP)) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
index d69c23690c9..4871443e54a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
@@ -60,6 +60,11 @@ public class InMemoryConfigurationStore extends YarnConfigurationStore {
pendingMutation = null;
}
+ @Override
+ public void format() {
+ this.schedConf = null;
+ }
+
@Override
public synchronized Configuration retrieve() {
return schedConf;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
index 21de7a26d61..0792d7f3dc3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
@@ -98,6 +98,13 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
}
}
+ @Override
+ public void format() throws Exception {
+ close();
+ FileSystem fs = FileSystem.getLocal(conf);
+ fs.delete(getStorageDir(), true);
+ }
+
private void initDatabase(Configuration config) throws Exception {
Path storeRoot = createStorageDir();
Options options = new Options();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
index 51de4370666..6d153dccaa1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
@@ -40,6 +40,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* CS configuration provider which implements
@@ -58,6 +59,9 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
private ConfigurationMutationACLPolicy aclMutationPolicy;
private RMContext rmContext;
+ private final ReentrantReadWriteLock formatLock =
+ new ReentrantReadWriteLock();
+
public MutableCSConfigurationProvider(RMContext rmContext) {
this.rmContext = rmContext;
}
@@ -151,17 +155,51 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
}
}
+ @Override
+ public void formatConfigurationInStore(Configuration config)
+ throws Exception {
+ formatLock.writeLock().lock();
+ try {
+ confStore.format();
+ Configuration initialSchedConf = new Configuration(false);
+ initialSchedConf.addResource(YarnConfiguration.CS_CONFIGURATION_FILE);
+ this.schedConf = new Configuration(false);
+ // We need to explicitly set the key-values in schedConf, otherwise
+ // these configuration keys cannot be deleted when
+ // configuration is reloaded.
+ for (Map.Entry kv : initialSchedConf) {
+ schedConf.set(kv.getKey(), kv.getValue());
+ }
+ confStore.initialize(config, schedConf, rmContext);
+ confStore.checkVersion();
+ } catch (Exception e) {
+ throw new IOException(e);
+ } finally {
+ formatLock.writeLock().unlock();
+ }
+ }
+
@Override
public void confirmPendingMutation(boolean isValid) throws Exception {
- confStore.confirmMutation(isValid);
- if (!isValid) {
- schedConf = oldConf;
+ formatLock.readLock().lock();
+ try {
+ confStore.confirmMutation(isValid);
+ if (!isValid) {
+ schedConf = oldConf;
+ }
+ } finally {
+ formatLock.readLock().unlock();
}
}
@Override
public void reloadConfigurationFromStore() throws Exception {
- schedConf = confStore.retrieve();
+ formatLock.readLock().lock();
+ try {
+ schedConf = confStore.retrieve();
+ } finally {
+ formatLock.readLock().unlock();
+ }
}
private List getSiblingQueues(String queuePath, Configuration conf) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
index ef0a44b0789..493f38e731f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
@@ -125,6 +125,13 @@ public abstract class YarnConfigurationStore {
*/
public abstract Configuration retrieve() throws IOException;
+
+ /**
+ * Format the persisted configuration.
+ * @throws IOException on failure to format
+ */
+ public abstract void format() throws Exception;
+
/**
* Get a list of confirmed configuration mutations starting from a given id.
* @param fromId id from which to start getting mutations, inclusive
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java
index 7c224a5813d..766029b8de8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java
@@ -132,6 +132,11 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
return null;
}
+ @Override
+ public void format() throws Exception {
+ zkManager.delete(confStorePath);
+ }
+
@Override
public synchronized void storeVersion() throws Exception {
byte[] data =
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java
index a3fd2a95d23..4479c949091 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java
@@ -48,6 +48,9 @@ public final class RMWSConsts {
/** Path for {@code RMWebServices#updateSchedulerConfiguration}. */
public static final String SCHEDULER_CONF = "/scheduler-conf";
+ /** Path for {@code RMWebServices#formatSchedulerConfiguration}. */
+ public static final String FORMAT_SCHEDULER_CONF = "/scheduler-conf/format";
+
/** Path for {@code RMWebServiceProtocol#dumpSchedulerLogs}. */
public static final String SCHEDULER_LOGS = "/scheduler/logs";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
index 33858ea6c6d..56af548d4cd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
@@ -2319,6 +2319,37 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
return rm.getClientRMService().getContainers(request).getContainerList();
}
+ @GET
+ @Path(RMWSConsts.FORMAT_SCHEDULER_CONF)
+ @Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
+ MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
+ public Response formatSchedulerConfiguration(@Context HttpServletRequest hsr)
+ throws AuthorizationException {
+ // Only admin user allowed to format scheduler conf in configuration store
+ UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
+ initForWritableEndpoints(callerUGI, true);
+
+ ResourceScheduler scheduler = rm.getResourceScheduler();
+ if (scheduler instanceof MutableConfScheduler
+ && ((MutableConfScheduler) scheduler).isConfigurationMutable()) {
+ try {
+ MutableConfigurationProvider mutableConfigurationProvider =
+ ((MutableConfScheduler) scheduler).getMutableConfProvider();
+ mutableConfigurationProvider.formatConfigurationInStore(conf);
+ return Response.status(Status.OK).entity("Configuration under " +
+ "store successfully formatted.").build();
+ } catch (Exception e) {
+ LOG.error("Exception thrown when formating configuration", e);
+ return Response.status(Status.BAD_REQUEST).entity(e.getMessage())
+ .build();
+ }
+ } else {
+ return Response.status(Status.BAD_REQUEST)
+ .entity("Configuration change only supported by " +
+ "MutableConfScheduler.").build();
+ }
+ }
+
@PUT
@Path(RMWSConsts.SCHEDULER_CONF)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java
index 65314be447e..f3d5e745b1f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java
@@ -36,6 +36,9 @@ import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
/**
* Tests {@link FSSchedulerConfigurationStore}.
@@ -135,6 +138,19 @@ public class TestFSSchedulerConfigurationStore {
compareConfig(conf, storeConf);
}
+ @Test
+ public void testFormatConfiguration() throws Exception {
+ assertTrue(testSchedulerConfigurationDir.exists());
+ Configuration schedulerConf = new Configuration();
+ schedulerConf.set("a", "a");
+ writeConf(schedulerConf);
+ configurationStore.initialize(conf, conf, null);
+ Configuration storedConfig = configurationStore.retrieve();
+ assertEquals("a", storedConfig.get("a"));
+ configurationStore.format();
+ assertFalse(testSchedulerConfigurationDir.exists());
+ }
+
@Test
public void retrieve() throws Exception {
Configuration schedulerConf = new Configuration();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java
index 81bc7a71ee5..c125491d8e1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java
@@ -104,6 +104,10 @@ public class TestMutableCSConfigurationProvider {
confProvider.confirmPendingMutation(false);
assertNull(confProvider.loadConfiguration(conf).get(
"yarn.scheduler.capacity.root.a.badKey"));
+
+ confProvider.formatConfigurationInStore(conf);
+ assertNull(confProvider.loadConfiguration(conf)
+ .get("yarn.scheduler.capacity.root.a.goodKey"));
}
@Test
@@ -137,6 +141,10 @@ public class TestMutableCSConfigurationProvider {
assertNull(confProvider.loadConfiguration(conf).get(
"yarn.scheduler.capacity.root.a.badKey"));
+ confProvider.formatConfigurationInStore(conf);
+ assertNull(confProvider.loadConfiguration(conf)
+ .get("yarn.scheduler.capacity.root.a.goodKey"));
+
}
private void writeConf(Configuration conf, String storePath)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java
index 6e7cb545d30..6a930e868a5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java
@@ -128,6 +128,15 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
}
+ @Test
+ public void testFormatConfiguration() throws Exception {
+ schedConf.set("key", "val");
+ confStore.initialize(conf, schedConf, rmContext);
+ assertEquals("val", confStore.retrieve().get("key"));
+ confStore.format();
+ assertNull(confStore.retrieve());
+ }
+
@Test
public void testPersistUpdatedConfiguration() throws Exception {
confStore.initialize(conf, schedConf, rmContext);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesConfigurationMutation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesConfigurationMutation.java
index 3e2542c7deb..67f83c8d647 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesConfigurationMutation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesConfigurationMutation.java
@@ -189,6 +189,19 @@ public class TestRMWebServicesConfigurationMutation extends JerseyTestBase {
assertEquals(3, orgConf.getQueues("root").length);
}
+ @Test
+ public void testFormatSchedulerConf() throws Exception {
+ testAddNestedQueue();
+ WebResource r = resource();
+ ClientResponse response = r.path("ws").path("v1").path("cluster")
+ .queryParam("user.name", userName)
+ .path(RMWSConsts.FORMAT_SCHEDULER_CONF)
+ .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(Status.OK.getStatusCode(), response.getStatus());
+ CapacitySchedulerConfiguration orgConf = getSchedulerConf();
+ assertEquals(3, orgConf.getQueues("root").length);
+ }
+
@Test
public void testAddNestedQueue() throws Exception {
CapacitySchedulerConfiguration orgConf = getSchedulerConf();