From c15f25363b7093d88c2dc65847c9bf6619a082a7 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 7 Jan 2014 22:17:00 +0000 Subject: [PATCH] YARN-1482. Modified WebApplicationProxy to make it work across ResourceManager fail-over. Contributed by Xuan Gong. svn merge --ignore-ancestry -c 1556380 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1556381 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../hadoop/yarn/client/TestRMFailover.java | 73 +++++++++++++++++++ .../hadoop/yarn/client/ClientRMProxy.java | 0 .../resourcemanager/ResourceManager.java | 36 +++++---- .../server/webproxy/AppReportFetcher.java | 29 ++++---- .../yarn/server/webproxy/WebAppProxy.java | 3 + 6 files changed, 117 insertions(+), 27 deletions(-) rename hadoop-yarn-project/hadoop-yarn/{hadoop-yarn-client => hadoop-yarn-common}/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java (100%) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index fce74781cb1..42270d8194f 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -182,6 +182,9 @@ Release 2.4.0 - UNRELEASED YARN-1493. Changed ResourceManager and Scheduler interfacing to recognize app-attempts separately from apps. (Jian He via vinodkv) + YARN-1482. Modified WebApplicationProxy to make it work across ResourceManager + fail-over. (Xuan Gong via vinodkv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java index 96f1bbc1f6e..8900b160dc0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java @@ -24,6 +24,8 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,13 +33,18 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.ClientBaseWithFixes; import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.proto.HAServiceProtocolProtos; +import org.apache.hadoop.service.Service.STATE; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.resourcemanager.AdminService; +import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -55,6 +62,8 @@ public class TestRMFailover extends ClientBaseWithFixes { private Configuration conf; private MiniYARNCluster cluster; + private ApplicationId fakeAppId; + private void setConfForRM(String rmId, String prefix, String value) { conf.set(HAUtil.addSuffix(prefix, rmId), value); @@ -77,6 +86,7 @@ public class TestRMFailover extends ClientBaseWithFixes { @Before public void setup() throws IOException { + fakeAppId = ApplicationId.newInstance(System.currentTimeMillis(), 0); conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID); @@ -179,4 +189,67 @@ public class TestRMFailover extends ClientBaseWithFixes { failover(); verifyConnections(); } + + @Test + public void testWebAppProxyInStandAloneMode() throws YarnException, + InterruptedException, IOException { + WebAppProxyServer webAppProxyServer = new WebAppProxyServer(); + try { + conf.set(YarnConfiguration.PROXY_ADDRESS, "0.0.0.0:9099"); + cluster.init(conf); + cluster.start(); + getAdminService(0).transitionToActive(req); + assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); + verifyConnections(); + webAppProxyServer.init(conf); + + // Start webAppProxyServer + Assert.assertEquals(STATE.INITED, webAppProxyServer.getServiceState()); + webAppProxyServer.start(); + Assert.assertEquals(STATE.STARTED, webAppProxyServer.getServiceState()); + + URL wrongUrl = new URL("http://0.0.0.0:9099/proxy/" + fakeAppId); + HttpURLConnection proxyConn = (HttpURLConnection) wrongUrl + .openConnection(); + + proxyConn.connect(); + verifyExpectedException(proxyConn.getResponseMessage()); + + explicitFailover(); + verifyConnections(); + proxyConn.connect(); + verifyExpectedException(proxyConn.getResponseMessage()); + } finally { + webAppProxyServer.stop(); + } + } + + @Test + public void testEmbeddedWebAppProxy() throws YarnException, + InterruptedException, IOException { + cluster.init(conf); + cluster.start(); + getAdminService(0).transitionToActive(req); + assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); + verifyConnections(); + URL wrongUrl = new URL("http://0.0.0.0:18088/proxy/" + fakeAppId); + HttpURLConnection proxyConn = (HttpURLConnection) wrongUrl + .openConnection(); + + proxyConn.connect(); + verifyExpectedException(proxyConn.getResponseMessage()); + + explicitFailover(); + verifyConnections(); + proxyConn.connect(); + verifyExpectedException(proxyConn.getResponseMessage()); + } + + private void verifyExpectedException(String exceptionMessage){ + assertTrue(exceptionMessage.contains(ApplicationNotFoundException.class + .getName())); + assertTrue(exceptionMessage + .contains("Application with id '" + fakeAppId + "' " + + "doesn't exist in RM.")); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java similarity index 100% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 3dabdb2adc3..16c7ac7eedd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -147,9 +147,12 @@ public class ResourceManager extends CompositeService implements Recoverable { protected QueueACLsManager queueACLsManager; private DelegationTokenRenewer delegationTokenRenewer; private WebApp webApp; + private AppReportFetcher fetcher = null; protected ResourceTrackerService resourceTracker; private boolean recoveryEnabled; + private String webAppAddress; + /** End of Active services */ private Configuration conf; @@ -194,6 +197,8 @@ public class ResourceManager extends CompositeService implements Recoverable { } createAndInitActiveServices(); + webAppAddress = WebAppUtils.getRMWebAppURLWithoutScheme(conf); + super.serviceInit(conf); } @@ -437,22 +442,12 @@ public class ResourceManager extends CompositeService implements Recoverable { throw e; } } - startWepApp(); - - if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) { - int port = webApp.port(); - WebAppUtils.setRMWebAppPort(conf, port); - } super.serviceStart(); } @Override protected void serviceStop() throws Exception { - if (webApp != null) { - webApp.stop(); - } - DefaultMetricsSystem.shutdown(); @@ -752,12 +747,16 @@ public class ResourceManager extends CompositeService implements Recoverable { YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY) .withHttpSpnegoKeytabKey( YarnConfiguration.RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY) - .at(WebAppUtils.getRMWebAppURLWithoutScheme(conf)); + .at(webAppAddress); String proxyHostAndPort = WebAppUtils.getProxyHostAndPort(conf); if(WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf). equals(proxyHostAndPort)) { - AppReportFetcher fetcher = new AppReportFetcher(conf, getClientRMService()); - builder.withServlet(ProxyUriUtils.PROXY_SERVLET_NAME, + if (HAUtil.isHAEnabled(conf)) { + fetcher = new AppReportFetcher(conf); + } else { + fetcher = new AppReportFetcher(conf, getClientRMService()); + } + builder.withServlet(ProxyUriUtils.PROXY_SERVLET_NAME, ProxyUriUtils.PROXY_PATH_SPEC, WebAppProxyServlet.class); builder.withAttribute(WebAppProxy.FETCHER_ATTRIBUTE, fetcher); String[] proxyParts = proxyHostAndPort.split(":"); @@ -854,6 +853,11 @@ public class ResourceManager extends CompositeService implements Recoverable { transitionToActive(); } + startWepApp(); + if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) { + int port = webApp.port(); + WebAppUtils.setRMWebAppPort(conf, port); + } super.serviceStart(); } @@ -864,6 +868,12 @@ public class ResourceManager extends CompositeService implements Recoverable { @Override protected void serviceStop() throws Exception { + if (webApp != null) { + webApp.stop(); + } + if (fetcher != null) { + fetcher.stop(); + } super.serviceStop(); transitionToStandby(false); rmContext.setHAServiceState(HAServiceState.STOPPING); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java index 077783facfc..5c93413dc07 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java @@ -19,21 +19,20 @@ package org.apache.hadoop.yarn.server.webproxy; import java.io.IOException; -import java.net.InetSocketAddress; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.YarnRPC; /** * This class abstracts away how ApplicationReports are fetched. @@ -50,16 +49,12 @@ public class AppReportFetcher { */ public AppReportFetcher(Configuration conf) { this.conf = conf; - YarnRPC rpc = YarnRPC.create(this.conf); - InetSocketAddress rmAddress = conf.getSocketAddr( - YarnConfiguration.RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_PORT); - LOG.info("Connecting to ResourceManager at " + rmAddress); - applicationsManager = - (ApplicationClientProtocol) rpc.getProxy(ApplicationClientProtocol.class, - rmAddress, this.conf); - LOG.info("Connected to ResourceManager at " + rmAddress); + try { + applicationsManager = ClientRMProxy.createRMProxy(conf, + ApplicationClientProtocol.class); + } catch (IOException e) { + throw new YarnRuntimeException(e); + } } /** @@ -91,4 +86,10 @@ public class AppReportFetcher { .getApplicationReport(request); return response.getApplicationReport(); } + + public void stop() { + if (this.applicationsManager != null) { + RPC.stopProxy(this.applicationsManager); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java index 2fbb0b886f0..fae279a238b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java @@ -117,6 +117,9 @@ public class WebAppProxy extends AbstractService { throw new YarnRuntimeException("Error stopping proxy web server",e); } } + if(this.fetcher != null) { + this.fetcher.stop(); + } super.serviceStop(); }