YARN-1525. Web UI should redirect to active RM when HA is enabled. (Cindy Li via kasha)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1575167 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Karthik Kambatla 2014-03-07 04:35:01 +00:00
parent f70da813f0
commit 09671ddb46
8 changed files with 260 additions and 6 deletions

View File

@ -253,6 +253,9 @@ Release 2.4.0 - UNRELEASED
YARN-1780. Improved logging in the Timeline client and server. (Zhijie Shen
via vinodkv)
YARN-1525. Web UI should redirect to active RM when HA is enabled. (Cindy Li
via kasha)
OPTIMIZATIONS
BUG FIXES

View File

@ -26,13 +26,14 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
@ -252,4 +253,37 @@ public class TestRMFailover extends ClientBaseWithFixes {
.contains("Application with id '" + fakeAppId + "' " +
"doesn't exist in RM."));
}
@Test
public void testRMWebAppRedirect() throws YarnException,
InterruptedException, IOException {
cluster = new MiniYARNCluster(TestRMFailover.class.getName(), 2, 0, 1, 1);
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
cluster.init(conf);
cluster.start();
getAdminService(0).transitionToActive(req);
String rm1Url = "http://0.0.0.0:18088";
String rm2Url = "http://0.0.0.0:28088";
String header = getHeader("Refresh", rm2Url);
assertTrue(header.contains("; url=" + rm1Url));
header = getHeader("Refresh", rm2Url + "/cluster/cluster");
assertEquals(null, header);
// Due to the limitation of MiniYARNCluster and dispatcher is a singleton,
// we couldn't add the test case after explicitFailover();
}
static String getHeader(String field, String url) {
String fieldHeader = null;
try {
Map<String, List<String>> map =
new URL(url).openConnection().getHeaderFields();
fieldHeader = map.get(field).get(0);
} catch (Exception e) {
// throw new RuntimeException(e);
}
return fieldHeader;
}
}

View File

@ -57,11 +57,11 @@ public class Dispatcher extends HttpServlet {
private transient final Injector injector;
private transient final Router router;
private transient final WebApp webApp;
protected transient final WebApp webApp;
private volatile boolean devMode = false;
@Inject
Dispatcher(WebApp webApp, Injector injector, Router router) {
protected Dispatcher(WebApp webApp, Injector injector, Router router) {
this.webApp = webApp;
this.injector = injector;
this.router = router;

View File

@ -44,7 +44,7 @@ import com.google.common.collect.Maps;
* Manages path info to controller#action routing.
*/
@InterfaceAudience.LimitedPrivate({"YARN", "MapReduce"})
class Router {
public class Router {
static final Logger LOG = LoggerFactory.getLogger(Router.class);
static final ImmutableList<String> EMPTY_LIST = ImmutableList.of();
static final CharMatcher SLASH = CharMatcher.is('/');

View File

@ -55,7 +55,7 @@ public abstract class WebApp extends ServletModule {
private volatile String name;
private volatile List<String> servePathSpecs = new ArrayList<String>();
// path to redirect to if user goes to "/"
// path to redirect to
private volatile String redirectPath;
private volatile String wsName;
private volatile Configuration conf;
@ -134,7 +134,9 @@ public abstract class WebApp extends ServletModule {
* more easily differentiate the different webapps.
* @param path the path to redirect to
*/
void setRedirectPath(String path) { this.redirectPath = path; }
protected void setRedirectPath(String path) {
this.redirectPath = path;
}
void setWebServices (String name) { this.wsName = name; }
@ -158,6 +160,10 @@ public abstract class WebApp extends ServletModule {
serve(path).with(Dispatcher.class);
}
configureRSServlets();
}
protected void configureRSServlets() {
// Add in the web services filters/serves if app has them.
// Using Jersey/guice integration module. If user has web services
// they must have also bound a default one in their webapp code.

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.util.Collection;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HAServiceTarget;
import org.apache.hadoop.yarn.client.RMHAServiceTarget;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@Private
@Unstable
public class RMHAUtils {
public static String findActiveRMHAId(YarnConfiguration conf) {
YarnConfiguration yarnConf = new YarnConfiguration(conf);
Collection<String> rmIds =
yarnConf.getStringCollection(YarnConfiguration.RM_HA_IDS);
for (String currentId : rmIds) {
yarnConf.set(YarnConfiguration.RM_HA_ID, currentId);
try {
HAServiceState haState = getHAState(yarnConf);
if (haState.equals(HAServiceState.ACTIVE)) {
return currentId;
}
} catch (Exception e) {
// Couldn't check if this RM is active. Do nothing. Worst case,
// we wouldn't find an Active RM and return null.
}
}
return null; // Couldn't find an Active RM
}
private static HAServiceState getHAState(YarnConfiguration yarnConf)
throws Exception {
HAServiceTarget haServiceTarget;
int rpcTimeoutForChecks =
yarnConf.getInt(CommonConfigurationKeys.HA_FC_CLI_CHECK_TIMEOUT_KEY,
CommonConfigurationKeys.HA_FC_CLI_CHECK_TIMEOUT_DEFAULT);
yarnConf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY,
yarnConf.get(YarnConfiguration.RM_PRINCIPAL, ""));
haServiceTarget = new RMHAServiceTarget(yarnConf);
HAServiceProtocol proto =
haServiceTarget.getProxy(yarnConf, rpcTimeoutForChecks);
HAServiceState haState = proto.getServiceStatus().getState();
return haState;
}
}

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import java.io.IOException;
import java.io.PrintWriter;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.http.HtmlQuoting;
import org.apache.hadoop.yarn.webapp.Dispatcher;
import org.apache.hadoop.yarn.webapp.Router;
import org.apache.hadoop.yarn.webapp.WebApp;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Singleton;
@InterfaceAudience.LimitedPrivate({ "YARN", "MapReduce" })
@Singleton
public class RMDispatcher extends Dispatcher {
/**
*
*/
private static final long serialVersionUID = 1L;
@Inject
RMDispatcher(WebApp webApp, Injector injector, Router router) {
super(webApp, injector, router);
}
@Override
public void service(HttpServletRequest req, HttpServletResponse res)
throws ServletException, IOException {
res.setCharacterEncoding("UTF-8");
String uri = HtmlQuoting.quoteHtmlChars(req.getRequestURI());
if (uri == null) {
uri = "/";
}
RMWebApp rmWebApp = (RMWebApp) webApp;
rmWebApp.checkIfStandbyRM();
if (rmWebApp.isStandby()
&& !uri.equals("/" + rmWebApp.name() + "/cluster")) {
String redirectPath = rmWebApp.getRedirectPath() + uri;
if (redirectPath != null && !redirectPath.isEmpty()) {
String redirectMsg =
"This is standby RM. Redirecting to the current active RM: "
+ redirectPath;
res.addHeader("Refresh", "3; url=" + redirectPath);
PrintWriter out = res.getWriter();
out.println(redirectMsg);
return;
}
}
super.service(req, res);
}
}

View File

@ -20,10 +20,16 @@ package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import static org.apache.hadoop.yarn.util.StringHelper.pajoin;
import java.net.InetSocketAddress;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMHAUtils;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.webapp.Dispatcher;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.YarnWebParams;
@ -34,6 +40,7 @@ import org.apache.hadoop.yarn.webapp.YarnWebParams;
public class RMWebApp extends WebApp implements YarnWebParams {
private final ResourceManager rm;
private boolean standby = false;
public RMWebApp(ResourceManager rm) {
this.rm = rm;
@ -59,4 +66,59 @@ public class RMWebApp extends WebApp implements YarnWebParams {
route("/scheduler", RmController.class, "scheduler");
route(pajoin("/queue", QUEUE_NAME), RmController.class, "queue");
}
@Override
public void configureServlets() {
setup();
serve("/").with(RMDispatcher.class);
serve("/__stop").with(Dispatcher.class);
for (String path : super.getServePathSpecs()) {
serve(path).with(RMDispatcher.class);
}
configureRSServlets();
}
public void checkIfStandbyRM() {
standby = (rm.getRMContext().getHAServiceState() == HAServiceState.STANDBY);
}
public boolean isStandby() {
return standby;
}
@Override
public String getRedirectPath() {
if (standby) {
return buildRedirectPath();
} else
return super.getRedirectPath();
}
private String buildRedirectPath() {
// make a copy of the original configuration so not to mutate it. Also use
// an YarnConfiguration to force loading of yarn-site.xml.
YarnConfiguration yarnConf = new YarnConfiguration(rm.getConfig());
String activeRMHAId = RMHAUtils.findActiveRMHAId(yarnConf);
String path = "";
if (activeRMHAId != null) {
yarnConf.set(YarnConfiguration.RM_HA_ID, activeRMHAId);
InetSocketAddress sock = YarnConfiguration.useHttps(yarnConf)
? yarnConf.getSocketAddr(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS,
YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_ADDRESS,
YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT)
: yarnConf.getSocketAddr(YarnConfiguration.RM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_RM_WEBAPP_PORT);
path = sock.getHostName() + ":" + Integer.toString(sock.getPort());
path = YarnConfiguration.useHttps(yarnConf)
? "https://" + path
: "http://" + path;
}
return path;
}
}