YARN-1482. Modified WebApplicationProxy to make it work across ResourceManager fail-over. Contributed by Xuan Gong.

svn merge --ignore-ancestry -c 1556380 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1556381 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-01-07 22:17:00 +00:00
parent 4c4ceedd38
commit c15f25363b
6 changed files with 117 additions and 27 deletions

View File

@ -182,6 +182,9 @@ Release 2.4.0 - UNRELEASED
YARN-1493. Changed ResourceManager and Scheduler interfacing to recognize YARN-1493. Changed ResourceManager and Scheduler interfacing to recognize
app-attempts separately from apps. (Jian He via vinodkv) app-attempts separately from apps. (Jian He via vinodkv)
YARN-1482. Modified WebApplicationProxy to make it work across ResourceManager
fail-over. (Xuan Gong via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -24,6 +24,8 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -31,13 +33,18 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ClientBaseWithFixes; import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos; import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.AdminService; import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -55,6 +62,8 @@ public class TestRMFailover extends ClientBaseWithFixes {
private Configuration conf; private Configuration conf;
private MiniYARNCluster cluster; private MiniYARNCluster cluster;
private ApplicationId fakeAppId;
private void setConfForRM(String rmId, String prefix, String value) { private void setConfForRM(String rmId, String prefix, String value) {
conf.set(HAUtil.addSuffix(prefix, rmId), value); conf.set(HAUtil.addSuffix(prefix, rmId), value);
@ -77,6 +86,7 @@ public class TestRMFailover extends ClientBaseWithFixes {
@Before @Before
public void setup() throws IOException { public void setup() throws IOException {
fakeAppId = ApplicationId.newInstance(System.currentTimeMillis(), 0);
conf = new YarnConfiguration(); conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID); conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
@ -179,4 +189,67 @@ public class TestRMFailover extends ClientBaseWithFixes {
failover(); failover();
verifyConnections(); verifyConnections();
} }
@Test
public void testWebAppProxyInStandAloneMode() throws YarnException,
InterruptedException, IOException {
WebAppProxyServer webAppProxyServer = new WebAppProxyServer();
try {
conf.set(YarnConfiguration.PROXY_ADDRESS, "0.0.0.0:9099");
cluster.init(conf);
cluster.start();
getAdminService(0).transitionToActive(req);
assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
verifyConnections();
webAppProxyServer.init(conf);
// Start webAppProxyServer
Assert.assertEquals(STATE.INITED, webAppProxyServer.getServiceState());
webAppProxyServer.start();
Assert.assertEquals(STATE.STARTED, webAppProxyServer.getServiceState());
URL wrongUrl = new URL("http://0.0.0.0:9099/proxy/" + fakeAppId);
HttpURLConnection proxyConn = (HttpURLConnection) wrongUrl
.openConnection();
proxyConn.connect();
verifyExpectedException(proxyConn.getResponseMessage());
explicitFailover();
verifyConnections();
proxyConn.connect();
verifyExpectedException(proxyConn.getResponseMessage());
} finally {
webAppProxyServer.stop();
}
}
@Test
public void testEmbeddedWebAppProxy() throws YarnException,
InterruptedException, IOException {
cluster.init(conf);
cluster.start();
getAdminService(0).transitionToActive(req);
assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
verifyConnections();
URL wrongUrl = new URL("http://0.0.0.0:18088/proxy/" + fakeAppId);
HttpURLConnection proxyConn = (HttpURLConnection) wrongUrl
.openConnection();
proxyConn.connect();
verifyExpectedException(proxyConn.getResponseMessage());
explicitFailover();
verifyConnections();
proxyConn.connect();
verifyExpectedException(proxyConn.getResponseMessage());
}
private void verifyExpectedException(String exceptionMessage){
assertTrue(exceptionMessage.contains(ApplicationNotFoundException.class
.getName()));
assertTrue(exceptionMessage
.contains("Application with id '" + fakeAppId + "' " +
"doesn't exist in RM."));
}
} }

View File

@ -147,9 +147,12 @@ public class ResourceManager extends CompositeService implements Recoverable {
protected QueueACLsManager queueACLsManager; protected QueueACLsManager queueACLsManager;
private DelegationTokenRenewer delegationTokenRenewer; private DelegationTokenRenewer delegationTokenRenewer;
private WebApp webApp; private WebApp webApp;
private AppReportFetcher fetcher = null;
protected ResourceTrackerService resourceTracker; protected ResourceTrackerService resourceTracker;
private boolean recoveryEnabled; private boolean recoveryEnabled;
private String webAppAddress;
/** End of Active services */ /** End of Active services */
private Configuration conf; private Configuration conf;
@ -194,6 +197,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
} }
createAndInitActiveServices(); createAndInitActiveServices();
webAppAddress = WebAppUtils.getRMWebAppURLWithoutScheme(conf);
super.serviceInit(conf); super.serviceInit(conf);
} }
@ -437,22 +442,12 @@ public class ResourceManager extends CompositeService implements Recoverable {
throw e; throw e;
} }
} }
startWepApp();
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
int port = webApp.port();
WebAppUtils.setRMWebAppPort(conf, port);
}
super.serviceStart(); super.serviceStart();
} }
@Override @Override
protected void serviceStop() throws Exception { protected void serviceStop() throws Exception {
if (webApp != null) {
webApp.stop();
}
DefaultMetricsSystem.shutdown(); DefaultMetricsSystem.shutdown();
@ -752,11 +747,15 @@ public class ResourceManager extends CompositeService implements Recoverable {
YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY) YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY)
.withHttpSpnegoKeytabKey( .withHttpSpnegoKeytabKey(
YarnConfiguration.RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY) YarnConfiguration.RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY)
.at(WebAppUtils.getRMWebAppURLWithoutScheme(conf)); .at(webAppAddress);
String proxyHostAndPort = WebAppUtils.getProxyHostAndPort(conf); String proxyHostAndPort = WebAppUtils.getProxyHostAndPort(conf);
if(WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf). if(WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf).
equals(proxyHostAndPort)) { equals(proxyHostAndPort)) {
AppReportFetcher fetcher = new AppReportFetcher(conf, getClientRMService()); if (HAUtil.isHAEnabled(conf)) {
fetcher = new AppReportFetcher(conf);
} else {
fetcher = new AppReportFetcher(conf, getClientRMService());
}
builder.withServlet(ProxyUriUtils.PROXY_SERVLET_NAME, builder.withServlet(ProxyUriUtils.PROXY_SERVLET_NAME,
ProxyUriUtils.PROXY_PATH_SPEC, WebAppProxyServlet.class); ProxyUriUtils.PROXY_PATH_SPEC, WebAppProxyServlet.class);
builder.withAttribute(WebAppProxy.FETCHER_ATTRIBUTE, fetcher); builder.withAttribute(WebAppProxy.FETCHER_ATTRIBUTE, fetcher);
@ -854,6 +853,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
transitionToActive(); transitionToActive();
} }
startWepApp();
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
int port = webApp.port();
WebAppUtils.setRMWebAppPort(conf, port);
}
super.serviceStart(); super.serviceStart();
} }
@ -864,6 +868,12 @@ public class ResourceManager extends CompositeService implements Recoverable {
@Override @Override
protected void serviceStop() throws Exception { protected void serviceStop() throws Exception {
if (webApp != null) {
webApp.stop();
}
if (fetcher != null) {
fetcher.stop();
}
super.serviceStop(); super.serviceStop();
transitionToStandby(false); transitionToStandby(false);
rmContext.setHAServiceState(HAServiceState.STOPPING); rmContext.setHAServiceState(HAServiceState.STOPPING);

View File

@ -19,21 +19,20 @@
package org.apache.hadoop.yarn.server.webproxy; package org.apache.hadoop.yarn.server.webproxy;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
/** /**
* This class abstracts away how ApplicationReports are fetched. * This class abstracts away how ApplicationReports are fetched.
@ -50,16 +49,12 @@ public class AppReportFetcher {
*/ */
public AppReportFetcher(Configuration conf) { public AppReportFetcher(Configuration conf) {
this.conf = conf; this.conf = conf;
YarnRPC rpc = YarnRPC.create(this.conf); try {
InetSocketAddress rmAddress = conf.getSocketAddr( applicationsManager = ClientRMProxy.createRMProxy(conf,
YarnConfiguration.RM_ADDRESS, ApplicationClientProtocol.class);
YarnConfiguration.DEFAULT_RM_ADDRESS, } catch (IOException e) {
YarnConfiguration.DEFAULT_RM_PORT); throw new YarnRuntimeException(e);
LOG.info("Connecting to ResourceManager at " + rmAddress); }
applicationsManager =
(ApplicationClientProtocol) rpc.getProxy(ApplicationClientProtocol.class,
rmAddress, this.conf);
LOG.info("Connected to ResourceManager at " + rmAddress);
} }
/** /**
@ -91,4 +86,10 @@ public class AppReportFetcher {
.getApplicationReport(request); .getApplicationReport(request);
return response.getApplicationReport(); return response.getApplicationReport();
} }
public void stop() {
if (this.applicationsManager != null) {
RPC.stopProxy(this.applicationsManager);
}
}
} }

View File

@ -117,6 +117,9 @@ public class WebAppProxy extends AbstractService {
throw new YarnRuntimeException("Error stopping proxy web server",e); throw new YarnRuntimeException("Error stopping proxy web server",e);
} }
} }
if(this.fetcher != null) {
this.fetcher.stop();
}
super.serviceStop(); super.serviceStop();
} }