diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml index 299bd187826..26391dbe06e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml @@ -64,6 +64,16 @@ org.apache.hadoop hadoop-annotations + + com.sun.jersey.jersey-test-framework + jersey-test-framework-core + test + + + com.sun.jersey.jersey-test-framework + jersey-test-framework-grizzly2 + test + org.mockito mockito-all diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java index 33f9450db40..d134ba36bbf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java @@ -25,6 +25,7 @@ import com.sun.jersey.api.client.ClientRequest; import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.UniformInterfaceException; import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.api.client.WebResource.Builder; import com.sun.jersey.api.client.filter.ClientFilter; import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory; import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; @@ -155,6 +156,9 @@ public class LogsCLI extends Configured implements Tool { if (yarnClient != null) { yarnClient.close(); } + if (webServiceClient != null) { + webServiceClient.destroy(); + } } } @@ -418,24 +422,34 @@ public class LogsCLI extends Configured implements Tool { } protected List getAMContainerInfoForRMWebService( - Configuration conf, String appId) throws ClientHandlerException, + Configuration conf, String appId) throws Exception { + return WebAppUtils.execOnActiveRM(conf, this::getAMContainerInfoFromRM, + appId); + } + + private List getAMContainerInfoFromRM( + String webAppAddress, String appId) throws ClientHandlerException, UniformInterfaceException, JSONException { - String webAppAddress = WebAppUtils.getRMWebAppURLWithScheme(conf); - - WebResource webResource = webServiceClient.resource(webAppAddress); - - ClientResponse response = - webResource.path("ws").path("v1").path("cluster").path("apps") - .path(appId).path("appattempts").accept(MediaType.APPLICATION_JSON) - .get(ClientResponse.class); - JSONObject json = - response.getEntity(JSONObject.class).getJSONObject("appAttempts"); - JSONArray requests = json.getJSONArray("appAttempt"); List amContainersList = new ArrayList(); - for (int i = 0; i < requests.length(); i++) { - amContainersList.add(requests.getJSONObject(i)); + ClientResponse response = null; + try { + Builder builder = webServiceClient.resource(webAppAddress) + .path("ws").path("v1").path("cluster") + .path("apps").path(appId).path("appattempts") + .accept(MediaType.APPLICATION_JSON); + response = builder.get(ClientResponse.class); + JSONObject json = response.getEntity(JSONObject.class) + .getJSONObject("appAttempts"); + JSONArray requests = json.getJSONArray("appAttempt"); + for (int j = 0; j < requests.length(); j++) { + amContainersList.add(requests.getJSONObject(j)); + } + return amContainersList; + } finally { + if (response != null) { + response.close(); + } } - return amContainersList; } private List getAMContainerInfoForAHSWebService( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/SchedConfCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/SchedConfCLI.java index 11bfdd7ba14..273b50d27cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/SchedConfCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/SchedConfCLI.java @@ -22,13 +22,16 @@ import com.google.common.annotations.VisibleForTesting; import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.api.client.WebResource.Builder; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.MissingArgumentException; import org.apache.commons.cli.Options; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Tool; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo; @@ -55,6 +58,7 @@ public class SchedConfCLI extends Configured implements Tool { private static final String REMOVE_QUEUES_OPTION = "removeQueues"; private static final String UPDATE_QUEUES_OPTION = "updateQueues"; private static final String GLOBAL_OPTIONS = "globalUpdates"; + private static final String FORMAT_CONF = "formatConfig"; private static final String HELP_CMD = "help"; private static final String CONF_ERR_MSG = "Specify configuration key " + @@ -82,6 +86,9 @@ public class SchedConfCLI extends Configured implements Tool { "Update queue configurations"); opts.addOption("global", GLOBAL_OPTIONS, true, "Update global scheduler configurations"); + opts.addOption("format", FORMAT_CONF, false, + "Format Scheduler Configuration and reload from" + + " capacity-scheduler.xml"); opts.addOption("h", HELP_CMD, false, "Displays help for all commands."); int exitCode = -1; @@ -100,6 +107,7 @@ public class SchedConfCLI extends Configured implements Tool { } boolean hasOption = false; + boolean format = false; SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo(); try { if (parsedCli.hasOption(ADD_QUEUES_OPTION)) { @@ -120,6 +128,11 @@ public class SchedConfCLI extends Configured implements Tool { hasOption = true; globalUpdates(parsedCli.getOptionValue(GLOBAL_OPTIONS), updateInfo); } + if (parsedCli.hasOption((FORMAT_CONF))) { + hasOption = true; + format = true; + } + } catch (IllegalArgumentException e) { System.err.println(e.getMessage()); return -1; @@ -131,28 +144,103 @@ public class SchedConfCLI extends Configured implements Tool { return -1; } - Client webServiceClient = Client.create(); - WebResource webResource = webServiceClient.resource(WebAppUtils. - getRMWebAppURLWithScheme(getConf())); - ClientResponse response = webResource.path("ws").path("v1").path("cluster") - .path("scheduler-conf").accept(MediaType.APPLICATION_JSON) - .entity(YarnWebServiceUtils.toJson(updateInfo, - SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON) - .put(ClientResponse.class); - if (response != null) { - if (response.getStatus() == Status.OK.getStatusCode()) { - System.out.println("Configuration changed successfully."); - return 0; - } else { - System.err.println("Configuration change unsuccessful: " - + response.getEntity(String.class)); - } + Configuration conf = getConf(); + if (format) { + return WebAppUtils.execOnActiveRM(conf, this::formatSchedulerConf, null); } else { - System.err.println("Configuration change unsuccessful: null response"); + return WebAppUtils.execOnActiveRM(conf, + this::updateSchedulerConfOnRMNode, updateInfo); } - return -1; } + @VisibleForTesting + int formatSchedulerConf(String webAppAddress, WebResource resource) + throws Exception { + Client webServiceClient = Client.create(); + ClientResponse response = null; + resource = (resource != null) ? resource : + webServiceClient.resource(webAppAddress); + + try { + Builder builder = null; + if (UserGroupInformation.isSecurityEnabled()) { + builder = resource + .path("ws").path("v1").path("cluster") + .path("/scheduler-conf/format") + .accept(MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON); + } else { + builder = resource + .path("ws").path("v1").path("cluster") + .path("/scheduler-conf/format").queryParam("user.name", + UserGroupInformation.getCurrentUser().getShortUserName()) + .accept(MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON); + } + + response = builder.get(ClientResponse.class); + if (response != null) { + if (response.getStatus() == Status.OK.getStatusCode()) { + System.out.println(response.getEntity(String.class)); + return 0; + } else { + System.err.println("Failed to format scheduler configuration: " + + response.getEntity(String.class)); + } + } else { + System.err.println("Failed to format scheduler configuration: " + + "null response"); + } + return -1; + } finally { + if (response != null) { + response.close(); + } + webServiceClient.destroy(); + } + } + + @VisibleForTesting + int updateSchedulerConfOnRMNode(String webAppAddress, + SchedConfUpdateInfo updateInfo) throws Exception { + Client webServiceClient = Client.create(); + ClientResponse response = null; + WebResource resource = webServiceClient.resource(webAppAddress); + + try { + Builder builder = null; + if (UserGroupInformation.isSecurityEnabled()) { + builder = resource.path("ws").path("v1").path("cluster") + .path("scheduler-conf").accept(MediaType.APPLICATION_JSON); + } else { + builder = resource.path("ws").path("v1").path("cluster") + .queryParam("user.name", + UserGroupInformation.getCurrentUser().getShortUserName()) + .path("scheduler-conf").accept(MediaType.APPLICATION_JSON); + } + + builder.entity(YarnWebServiceUtils.toJson(updateInfo, + SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON); + response = builder.put(ClientResponse.class); + if (response != null) { + if (response.getStatus() == Status.OK.getStatusCode()) { + System.out.println("Configuration changed successfully."); + return 0; + } else { + System.err.println("Configuration change unsuccessful: " + + response.getEntity(String.class)); + } + } else { + System.err.println("Configuration change unsuccessful: null response"); + } + return -1; + } finally { + if (response != null) { + response.close(); + } + webServiceClient.destroy(); + } + } + + @VisibleForTesting void addQueues(String args, SchedConfUpdateInfo updateInfo) { if (args == null) { @@ -237,7 +325,8 @@ public class SchedConfCLI extends Configured implements Tool { + "[-remove \"queueRemovePath1;queueRemovePath2\"] " + "[-update \"queueUpdatePath1:confKey1=confVal1\"] " + "[-global globalConfKey1=globalConfVal1," - + "globalConfKey2=globalConfVal2]\n" + + "globalConfKey2=globalConfVal2] " + + "[-format]\n" + "Example (adding queues): yarn schedulerconf -add " + "\"root.a.a1:capacity=100,maximum-capacity=100;root.a.a2:capacity=0," + "maximum-capacity=0\"\n" @@ -248,6 +337,8 @@ public class SchedConfCLI extends Configured implements Tool { + "maximum-capacity=75\"\n" + "Example (global scheduler update): yarn schedulerconf " + "-global yarn.scheduler.capacity.maximum-applications=10000\n" + + "Example (format scheduler configuration): yarn schedulerconf " + + "-format\n" + "Note: This is an alpha feature, the syntax/options are subject to " + "change, please run at your own risk."); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestSchedConfCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestSchedConfCLI.java index 5364e83019c..598020d6f8e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestSchedConfCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestSchedConfCLI.java @@ -22,12 +22,48 @@ import org.junit.Before; import org.junit.Test; import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; import java.io.PrintStream; +import java.security.Principal; +import java.util.HashMap; import java.util.List; import java.util.Map; + +import com.google.inject.Guice; +import com.google.inject.Singleton; +import com.google.inject.servlet.ServletModule; +import com.sun.jersey.guice.spi.container.servlet.GuiceContainer; +import com.sun.jersey.test.framework.WebAppDescriptor; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.server.AuthenticationFilter; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.JAXBContextResolver; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServices; +import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; +import org.apache.hadoop.yarn.webapp.GuiceServletConfig; +import org.apache.hadoop.yarn.webapp.JerseyTestBase; import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo; import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; +import javax.servlet.FilterChain; +import javax.servlet.FilterConfig; +import javax.servlet.ServletException; +import javax.servlet.ServletResponse; +import javax.servlet.ServletRequest; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletRequestWrapper; +import javax.servlet.http.HttpServletResponse; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -35,7 +71,7 @@ import static org.junit.Assert.assertTrue; /** * Class for testing {@link SchedConfCLI}. */ -public class TestSchedConfCLI { +public class TestSchedConfCLI extends JerseyTestBase { private ByteArrayOutputStream sysOutStream; private PrintStream sysOut; @@ -45,6 +81,23 @@ public class TestSchedConfCLI { private SchedConfCLI cli; + private static MockRM rm; + private static String userName; + private static CapacitySchedulerConfiguration csConf; + + private static final File CONF_FILE = new File(new File("target", + "test-classes"), YarnConfiguration.CS_CONFIGURATION_FILE); + private static final File OLD_CONF_FILE = new File(new File("target", + "test-classes"), YarnConfiguration.CS_CONFIGURATION_FILE + ".tmp"); + + public TestSchedConfCLI() { + super(new WebAppDescriptor.Builder( + "org.apache.hadoop.yarn.server.resourcemanager.webapp") + .contextListenerClass(GuiceServletConfig.class) + .filterClass(com.google.inject.servlet.GuiceFilter.class) + .contextPath("jersey-guice-filter").servletPath("/").build()); + } + @Before public void setUp() { sysOutStream = new ByteArrayOutputStream(); @@ -58,6 +111,138 @@ public class TestSchedConfCLI { cli = new SchedConfCLI(); } + private static class WebServletModule extends ServletModule { + @Override + protected void configureServlets() { + bind(JAXBContextResolver.class); + bind(RMWebServices.class); + bind(GenericExceptionHandler.class); + Configuration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS, + YarnConfiguration.MEMORY_CONFIGURATION_STORE); + + try { + userName = UserGroupInformation.getCurrentUser().getShortUserName(); + } catch (IOException ioe) { + throw new RuntimeException("Unable to get current user name " + + ioe.getMessage(), ioe); + } + + csConf = new CapacitySchedulerConfiguration(new Configuration(false), + false); + setupQueueConfiguration(csConf); + + try { + if (CONF_FILE.exists()) { + if (!CONF_FILE.renameTo(OLD_CONF_FILE)) { + throw new RuntimeException("Failed to rename conf file"); + } + } + FileOutputStream out = new FileOutputStream(CONF_FILE); + csConf.writeXml(out); + out.close(); + } catch (IOException e) { + throw new RuntimeException("Failed to write XML file", e); + } + + rm = new MockRM(conf); + bind(ResourceManager.class).toInstance(rm); + serve("/*").with(GuiceContainer.class); + filter("/*").through(TestRMCustomAuthFilter.class); + } + } + + /** + * Custom filter which sets the Remote User for testing purpose. + */ + @Singleton + public static class TestRMCustomAuthFilter extends AuthenticationFilter { + @Override + public void init(FilterConfig filterConfig) { + + } + + @Override + public void doFilter(ServletRequest request, ServletResponse response, + FilterChain filterChain) throws IOException, ServletException { + HttpServletRequest httpRequest = (HttpServletRequest)request; + HttpServletResponse httpResponse = (HttpServletResponse) response; + httpRequest = new HttpServletRequestWrapper(httpRequest) { + public String getAuthType() { + return null; + } + + public String getRemoteUser() { + return userName; + } + + public Principal getUserPrincipal() { + return new Principal() { + @Override + public String getName() { + return userName; + } + }; + } + }; + doFilter(filterChain, httpRequest, httpResponse); + } + } + + private static void setupQueueConfiguration( + CapacitySchedulerConfiguration config) { + config.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[]{"testqueue"}); + String a = CapacitySchedulerConfiguration.ROOT + ".testqueue"; + config.setCapacity(a, 100f); + config.setMaximumCapacity(a, 100f); + } + + @Test(timeout = 100000) + public void testFormatSchedulerConf() throws Exception { + try { + super.setUp(); + GuiceServletConfig.setInjector( + Guice.createInjector(new WebServletModule())); + ResourceScheduler scheduler = rm.getResourceScheduler(); + MutableConfigurationProvider provider = + ((MutableConfScheduler) scheduler).getMutableConfProvider(); + + SchedConfUpdateInfo schedUpdateInfo = new SchedConfUpdateInfo(); + HashMap globalUpdates = new HashMap<>(); + globalUpdates.put("schedKey1", "schedVal1"); + schedUpdateInfo.setGlobalParams(globalUpdates); + + provider.logAndApplyMutation(UserGroupInformation.getCurrentUser(), + schedUpdateInfo); + rm.getRMContext().getRMAdminService().refreshQueues(); + provider.confirmPendingMutation(true); + + Configuration schedulerConf = provider.getConfiguration(); + assertEquals("schedVal1", schedulerConf.get("schedKey1")); + + int exitCode = cli.formatSchedulerConf("", resource()); + assertEquals(0, exitCode); + + schedulerConf = provider.getConfiguration(); + assertNull(schedulerConf.get("schedKey1")); + } finally { + if (rm != null) { + rm.stop(); + } + CONF_FILE.delete(); + if (OLD_CONF_FILE.exists()) { + if (!OLD_CONF_FILE.renameTo(CONF_FILE)) { + throw new RuntimeException("Failed to re-copy old" + + " configuration file"); + } + } + super.tearDown(); + } + } + @Test(timeout = 10000) public void testInvalidConf() throws Exception { // conf pair with no key should be invalid diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java index e62bf104ae4..09daf424d15 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java @@ -90,8 +90,41 @@ public class WebAppUtils { } } + /** + * Runs a certain function against the active RM. The function's first + * argument is expected to be a string which contains the address of + * the RM being tried. + */ + public static R execOnActiveRM(Configuration conf, + ThrowingBiFunction func, T arg) throws Exception { + String rm1Address = getRMWebAppURLWithScheme(conf, 0); + try { + return func.apply(rm1Address, arg); + } catch (Exception e) { + if (HAUtil.isHAEnabled(conf)) { + int rms = HAUtil.getRMHAIds(conf).size(); + for (int i=1; i { + R apply(T t, U u) throws Exception; + } + public static String getRMWebAppURLWithoutScheme(Configuration conf, - boolean isHAEnabled) { + boolean isHAEnabled, int haIdIndex) { YarnConfiguration yarnConfig = new YarnConfiguration(conf); // set RM_ID if we have not configure it. if (isHAEnabled) { @@ -99,7 +132,7 @@ public class WebAppUtils { if (rmId == null || rmId.isEmpty()) { List rmIds = new ArrayList<>(HAUtil.getRMHAIds(conf)); if (rmIds != null && !rmIds.isEmpty()) { - yarnConfig.set(YarnConfiguration.RM_HA_ID, rmIds.get(0)); + yarnConfig.set(YarnConfiguration.RM_HA_ID, rmIds.get(haIdIndex)); } } } @@ -120,13 +153,19 @@ public class WebAppUtils { } } + public static String getRMWebAppURLWithScheme(Configuration conf, + int haIdIndex) { + return getHttpSchemePrefix(conf) + getRMWebAppURLWithoutScheme( + conf, HAUtil.isHAEnabled(conf), haIdIndex); + } + public static String getRMWebAppURLWithScheme(Configuration conf) { return getHttpSchemePrefix(conf) + getRMWebAppURLWithoutScheme( - conf, HAUtil.isHAEnabled(conf)); + conf, HAUtil.isHAEnabled(conf), 0); } public static String getRMWebAppURLWithoutScheme(Configuration conf) { - return getRMWebAppURLWithoutScheme(conf, false); + return getRMWebAppURLWithoutScheme(conf, false, 0); } public static String getRouterWebAppURLWithScheme(Configuration conf) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/YarnWebServiceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/YarnWebServiceUtils.java index 1cf1e9717e4..fccb3e1415f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/YarnWebServiceUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/YarnWebServiceUtils.java @@ -21,7 +21,7 @@ import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.ClientHandlerException; import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.UniformInterfaceException; -import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.api.client.WebResource.Builder; import javax.ws.rs.core.MediaType; import com.sun.jersey.api.json.JSONJAXBContext; @@ -53,16 +53,36 @@ public final class YarnWebServiceUtils { public static JSONObject getNodeInfoFromRMWebService(Configuration conf, String nodeId) throws ClientHandlerException, UniformInterfaceException { + try { + return WebAppUtils.execOnActiveRM(conf, + YarnWebServiceUtils::getNodeInfoFromRM, nodeId); + } catch (Exception e) { + if (e instanceof ClientHandlerException) { + throw ((ClientHandlerException) e); + } else if (e instanceof UniformInterfaceException) { + throw ((UniformInterfaceException) e); + } else { + throw new RuntimeException(e); + } + } + } + + private static JSONObject getNodeInfoFromRM(String webAppAddress, + String nodeId) throws ClientHandlerException, UniformInterfaceException { Client webServiceClient = Client.create(); - String webAppAddress = WebAppUtils.getRMWebAppURLWithScheme(conf); - - WebResource webResource = webServiceClient.resource(webAppAddress); - - ClientResponse response = webResource.path("ws").path("v1") - .path("cluster").path("nodes") - .path(nodeId).accept(MediaType.APPLICATION_JSON) - .get(ClientResponse.class); - return response.getEntity(JSONObject.class); + ClientResponse response = null; + try { + Builder builder = webServiceClient.resource(webAppAddress) + .path("ws").path("v1").path("cluster") + .path("nodes").path(nodeId).accept(MediaType.APPLICATION_JSON); + response = builder.get(ClientResponse.class); + return response.getEntity(JSONObject.class); + } finally { + if (response != null) { + response.close(); + } + webServiceClient.destroy(); + } } @SuppressWarnings("rawtypes") diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfiguration.java index a053fdb9376..212e09c02e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfiguration.java @@ -56,6 +56,12 @@ public class TestYarnConfiguration { conf2.set("yarn.resourcemanager.hostname.rm2", "40.40.40.40"); String rmWebUrlinHA2 = WebAppUtils.getRMWebAppURLWithScheme(conf2); Assert.assertEquals("http://30.30.30.30:8088", rmWebUrlinHA2); + + rmWebUrlinHA2 = WebAppUtils.getRMWebAppURLWithScheme(conf2, 0); + Assert.assertEquals("http://30.30.30.30:8088", rmWebUrlinHA2); + + rmWebUrlinHA2 = WebAppUtils.getRMWebAppURLWithScheme(conf2, 1); + Assert.assertEquals("http://40.40.40.40:8088", rmWebUrlinHA2); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java index 6e56f3dd809..9e843dfe89c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java @@ -65,6 +65,8 @@ public interface MutableConfigurationProvider { */ Configuration getConfiguration(); + void formatConfigurationInStore(Configuration conf) throws Exception; + /** * Closes the configuration provider, releasing any required resources. * @throws IOException on failure to close diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java index d69c23690c9..4871443e54a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java @@ -60,6 +60,11 @@ public class InMemoryConfigurationStore extends YarnConfigurationStore { pendingMutation = null; } + @Override + public void format() { + this.schedConf = null; + } + @Override public synchronized Configuration retrieve() { return schedConf; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java index 21de7a26d61..0792d7f3dc3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java @@ -98,6 +98,13 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore { } } + @Override + public void format() throws Exception { + close(); + FileSystem fs = FileSystem.getLocal(conf); + fs.delete(getStorageDir(), true); + } + private void initDatabase(Configuration config) throws Exception { Path storeRoot = createStorageDir(); Options options = new Options(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java index de3a282a963..5e8c915fc47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java @@ -40,6 +40,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * CS configuration provider which implements @@ -58,6 +59,9 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider, private ConfigurationMutationACLPolicy aclMutationPolicy; private RMContext rmContext; + private final ReentrantReadWriteLock formatLock = + new ReentrantReadWriteLock(); + public MutableCSConfigurationProvider(RMContext rmContext) { this.rmContext = rmContext; } @@ -148,17 +152,51 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider, } } + @Override + public void formatConfigurationInStore(Configuration config) + throws Exception { + formatLock.writeLock().lock(); + try { + confStore.format(); + Configuration initialSchedConf = new Configuration(false); + initialSchedConf.addResource(YarnConfiguration.CS_CONFIGURATION_FILE); + this.schedConf = new Configuration(false); + // We need to explicitly set the key-values in schedConf, otherwise + // these configuration keys cannot be deleted when + // configuration is reloaded. + for (Map.Entry kv : initialSchedConf) { + schedConf.set(kv.getKey(), kv.getValue()); + } + confStore.initialize(config, schedConf, rmContext); + confStore.checkVersion(); + } catch (Exception e) { + throw new IOException(e); + } finally { + formatLock.writeLock().unlock(); + } + } + @Override public void confirmPendingMutation(boolean isValid) throws Exception { - confStore.confirmMutation(isValid); - if (!isValid) { - schedConf = oldConf; + formatLock.readLock().lock(); + try { + confStore.confirmMutation(isValid); + if (!isValid) { + schedConf = oldConf; + } + } finally { + formatLock.readLock().unlock(); } } @Override public void reloadConfigurationFromStore() throws Exception { - schedConf = confStore.retrieve(); + formatLock.readLock().lock(); + try { + schedConf = confStore.retrieve(); + } finally { + formatLock.readLock().unlock(); + } } private List getSiblingQueues(String queuePath, Configuration conf) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java index 7fb52fcccd8..ed6761b96cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java @@ -125,6 +125,12 @@ public abstract class YarnConfigurationStore { */ public abstract Configuration retrieve(); + /** + * Format the persisted configuration. + * @throws IOException on failure to format + */ + public abstract void format() throws Exception; + /** * Get a list of confirmed configuration mutations starting from a given id. * @param fromId id from which to start getting mutations, inclusive diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java index 7c224a5813d..766029b8de8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java @@ -132,6 +132,11 @@ public class ZKConfigurationStore extends YarnConfigurationStore { return null; } + @Override + public void format() throws Exception { + zkManager.delete(confStorePath); + } + @Override public synchronized void storeVersion() throws Exception { byte[] data = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java index 8d9ccf76a1c..4479c949091 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java @@ -45,6 +45,12 @@ public final class RMWSConsts { /** Path for {@code RMWebServiceProtocol#getSchedulerInfo}. */ public static final String SCHEDULER = "/scheduler"; + /** Path for {@code RMWebServices#updateSchedulerConfiguration}. */ + public static final String SCHEDULER_CONF = "/scheduler-conf"; + + /** Path for {@code RMWebServices#formatSchedulerConfiguration}. */ + public static final String FORMAT_SCHEDULER_CONF = "/scheduler-conf/format"; + /** Path for {@code RMWebServiceProtocol#dumpSchedulerLogs}. */ public static final String SCHEDULER_LOGS = "/scheduler/logs"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java index f9e115eb34b..933a09deb7a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java @@ -853,7 +853,7 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol { } @GET - @Path("/apps/{appid}/appattempts/{appattemptid}/containers/{containerid}") + @Path(RMWSConsts.GET_CONTAINER) @Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8, MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 }) @Override @@ -867,7 +867,7 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol { } @GET - @Path("/apps/{appid}/state") + @Path(RMWSConsts.APPS_APPID_STATE) @Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8, MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 }) @Override @@ -2320,7 +2320,7 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol { } @PUT - @Path("/scheduler-conf") + @Path(RMWSConsts.SCHEDULER_CONF) @Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8, MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 }) @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) @@ -2372,7 +2372,39 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol { } @GET - @Path("/scheduler-conf") + @Path(RMWSConsts.FORMAT_SCHEDULER_CONF) + @Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8, + MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 }) + public Response formatSchedulerConfiguration(@Context HttpServletRequest hsr) + throws AuthorizationException { + // Only admin user allowed to format scheduler conf in configuration store + UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true); + initForWritableEndpoints(callerUGI, true); + + ResourceScheduler scheduler = rm.getResourceScheduler(); + if (scheduler instanceof MutableConfScheduler + && ((MutableConfScheduler) scheduler).isConfigurationMutable()) { + try { + MutableConfigurationProvider mutableConfigurationProvider = + ((MutableConfScheduler) scheduler).getMutableConfProvider(); + mutableConfigurationProvider.formatConfigurationInStore(conf); + return Response.status(Status.OK).entity("Configuration under " + + "store successfully formatted.").build(); + } catch (Exception e) { + LOG.error("Exception thrown when formating configuration", e); + return Response.status(Status.BAD_REQUEST).entity(e.getMessage()) + .build(); + } + } else { + return Response.status(Status.BAD_REQUEST) + .entity("Configuration change only supported by " + + "MutableConfScheduler.").build(); + } + } + + + @GET + @Path(RMWSConsts.SCHEDULER_CONF) @Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8, MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 }) public Response getSchedulerConfiguration(@Context HttpServletRequest hsr) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java index 5d43ebbf2ac..1fcfc700404 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java @@ -98,5 +98,9 @@ public class TestMutableCSConfigurationProvider { confProvider.confirmPendingMutation(false); assertNull(confProvider.loadConfiguration(conf).get( "yarn.scheduler.capacity.root.a.badKey")); + + confProvider.formatConfigurationInStore(conf); + assertNull(confProvider.loadConfiguration(conf) + .get("yarn.scheduler.capacity.root.a.goodKey")); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java index 6e7cb545d30..6a930e868a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java @@ -128,6 +128,15 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest { } + @Test + public void testFormatConfiguration() throws Exception { + schedConf.set("key", "val"); + confStore.initialize(conf, schedConf, rmContext); + assertEquals("val", confStore.retrieve().get("key")); + confStore.format(); + assertNull(confStore.retrieve()); + } + @Test public void testPersistUpdatedConfiguration() throws Exception { confStore.initialize(conf, schedConf, rmContext); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesConfigurationMutation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesConfigurationMutation.java index 4a37e91fe30..0584ce818b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesConfigurationMutation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesConfigurationMutation.java @@ -185,6 +185,19 @@ public class TestRMWebServicesConfigurationMutation extends JerseyTestBase { assertEquals(3, orgConf.getQueues("root").length); } + @Test + public void testFormatSchedulerConf() throws Exception { + testAddNestedQueue(); + WebResource r = resource(); + ClientResponse response = r.path("ws").path("v1").path("cluster") + .queryParam("user.name", userName) + .path(RMWSConsts.FORMAT_SCHEDULER_CONF) + .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + CapacitySchedulerConfiguration orgConf = getSchedulerConf(); + assertEquals(3, orgConf.getQueues("root").length); + } + @Test public void testAddNestedQueue() throws Exception { CapacitySchedulerConfiguration orgConf = getSchedulerConf();