YARN-1482. Modified WebApplicationProxy to make it work across ResourceManager fail-over. Contributed by Xuan Gong.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1556380 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-01-07 22:16:30 +00:00
parent 70cff9e2f0
commit 4931545f76
6 changed files with 117 additions and 27 deletions

View File

@ -200,6 +200,9 @@ Release 2.4.0 - UNRELEASED
YARN-1493. Changed ResourceManager and Scheduler interfacing to recognize
app-attempts separately from apps. (Jian He via vinodkv)
YARN-1482. Modified WebApplicationProxy to make it work across ResourceManager
fail-over. (Xuan Gong via vinodkv)
OPTIMIZATIONS
BUG FIXES

View File

@ -24,6 +24,8 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -31,13 +33,18 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -55,6 +62,8 @@ public class TestRMFailover extends ClientBaseWithFixes {
private Configuration conf;
private MiniYARNCluster cluster;
private ApplicationId fakeAppId;
private void setConfForRM(String rmId, String prefix, String value) {
conf.set(HAUtil.addSuffix(prefix, rmId), value);
@ -77,6 +86,7 @@ public class TestRMFailover extends ClientBaseWithFixes {
@Before
public void setup() throws IOException {
fakeAppId = ApplicationId.newInstance(System.currentTimeMillis(), 0);
conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
@ -179,4 +189,67 @@ public class TestRMFailover extends ClientBaseWithFixes {
failover();
verifyConnections();
}
@Test
public void testWebAppProxyInStandAloneMode() throws YarnException,
InterruptedException, IOException {
WebAppProxyServer webAppProxyServer = new WebAppProxyServer();
try {
conf.set(YarnConfiguration.PROXY_ADDRESS, "0.0.0.0:9099");
cluster.init(conf);
cluster.start();
getAdminService(0).transitionToActive(req);
assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
verifyConnections();
webAppProxyServer.init(conf);
// Start webAppProxyServer
Assert.assertEquals(STATE.INITED, webAppProxyServer.getServiceState());
webAppProxyServer.start();
Assert.assertEquals(STATE.STARTED, webAppProxyServer.getServiceState());
URL wrongUrl = new URL("http://0.0.0.0:9099/proxy/" + fakeAppId);
HttpURLConnection proxyConn = (HttpURLConnection) wrongUrl
.openConnection();
proxyConn.connect();
verifyExpectedException(proxyConn.getResponseMessage());
explicitFailover();
verifyConnections();
proxyConn.connect();
verifyExpectedException(proxyConn.getResponseMessage());
} finally {
webAppProxyServer.stop();
}
}
@Test
public void testEmbeddedWebAppProxy() throws YarnException,
InterruptedException, IOException {
cluster.init(conf);
cluster.start();
getAdminService(0).transitionToActive(req);
assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
verifyConnections();
URL wrongUrl = new URL("http://0.0.0.0:18088/proxy/" + fakeAppId);
HttpURLConnection proxyConn = (HttpURLConnection) wrongUrl
.openConnection();
proxyConn.connect();
verifyExpectedException(proxyConn.getResponseMessage());
explicitFailover();
verifyConnections();
proxyConn.connect();
verifyExpectedException(proxyConn.getResponseMessage());
}
private void verifyExpectedException(String exceptionMessage){
assertTrue(exceptionMessage.contains(ApplicationNotFoundException.class
.getName()));
assertTrue(exceptionMessage
.contains("Application with id '" + fakeAppId + "' " +
"doesn't exist in RM."));
}
}

View File

@ -147,9 +147,12 @@ public class ResourceManager extends CompositeService implements Recoverable {
protected QueueACLsManager queueACLsManager;
private DelegationTokenRenewer delegationTokenRenewer;
private WebApp webApp;
private AppReportFetcher fetcher = null;
protected ResourceTrackerService resourceTracker;
private boolean recoveryEnabled;
private String webAppAddress;
/** End of Active services */
private Configuration conf;
@ -194,6 +197,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
}
createAndInitActiveServices();
webAppAddress = WebAppUtils.getRMWebAppURLWithoutScheme(conf);
super.serviceInit(conf);
}
@ -437,22 +442,12 @@ public class ResourceManager extends CompositeService implements Recoverable {
throw e;
}
}
startWepApp();
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
int port = webApp.port();
WebAppUtils.setRMWebAppPort(conf, port);
}
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
if (webApp != null) {
webApp.stop();
}
DefaultMetricsSystem.shutdown();
@ -752,12 +747,16 @@ public class ResourceManager extends CompositeService implements Recoverable {
YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY)
.withHttpSpnegoKeytabKey(
YarnConfiguration.RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY)
.at(WebAppUtils.getRMWebAppURLWithoutScheme(conf));
.at(webAppAddress);
String proxyHostAndPort = WebAppUtils.getProxyHostAndPort(conf);
if(WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf).
equals(proxyHostAndPort)) {
AppReportFetcher fetcher = new AppReportFetcher(conf, getClientRMService());
builder.withServlet(ProxyUriUtils.PROXY_SERVLET_NAME,
if (HAUtil.isHAEnabled(conf)) {
fetcher = new AppReportFetcher(conf);
} else {
fetcher = new AppReportFetcher(conf, getClientRMService());
}
builder.withServlet(ProxyUriUtils.PROXY_SERVLET_NAME,
ProxyUriUtils.PROXY_PATH_SPEC, WebAppProxyServlet.class);
builder.withAttribute(WebAppProxy.FETCHER_ATTRIBUTE, fetcher);
String[] proxyParts = proxyHostAndPort.split(":");
@ -854,6 +853,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
transitionToActive();
}
startWepApp();
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
int port = webApp.port();
WebAppUtils.setRMWebAppPort(conf, port);
}
super.serviceStart();
}
@ -864,6 +868,12 @@ public class ResourceManager extends CompositeService implements Recoverable {
@Override
protected void serviceStop() throws Exception {
if (webApp != null) {
webApp.stop();
}
if (fetcher != null) {
fetcher.stop();
}
super.serviceStop();
transitionToStandby(false);
rmContext.setHAServiceState(HAServiceState.STOPPING);

View File

@ -19,21 +19,20 @@
package org.apache.hadoop.yarn.server.webproxy;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
/**
* This class abstracts away how ApplicationReports are fetched.
@ -50,16 +49,12 @@ public class AppReportFetcher {
*/
public AppReportFetcher(Configuration conf) {
this.conf = conf;
YarnRPC rpc = YarnRPC.create(this.conf);
InetSocketAddress rmAddress = conf.getSocketAddr(
YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT);
LOG.info("Connecting to ResourceManager at " + rmAddress);
applicationsManager =
(ApplicationClientProtocol) rpc.getProxy(ApplicationClientProtocol.class,
rmAddress, this.conf);
LOG.info("Connected to ResourceManager at " + rmAddress);
try {
applicationsManager = ClientRMProxy.createRMProxy(conf,
ApplicationClientProtocol.class);
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
}
/**
@ -91,4 +86,10 @@ public class AppReportFetcher {
.getApplicationReport(request);
return response.getApplicationReport();
}
public void stop() {
if (this.applicationsManager != null) {
RPC.stopProxy(this.applicationsManager);
}
}
}

View File

@ -117,6 +117,9 @@ public class WebAppProxy extends AbstractService {
throw new YarnRuntimeException("Error stopping proxy web server",e);
}
}
if(this.fetcher != null) {
this.fetcher.stop();
}
super.serviceStop();
}