YARN-7269. Tracking URL in the app state does not get redirected to ApplicationMaster for Running applications. (wangda via Jian He)

Change-Id: I582566e09d5ef67e9cc9e9805231ccbabe57c3d0
This commit is contained in:
Wangda Tan 2017-10-03 12:18:57 -07:00
parent 28e05dc1da
commit 175ff0585d
4 changed files with 75 additions and 63 deletions

View File

@ -18,21 +18,26 @@
package org.apache.hadoop.yarn.server.webproxy.amfilter; package org.apache.hadoop.yarn.server.webproxy.amfilter;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.FilterContainer;
import org.apache.hadoop.http.FilterInitializer;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.FilterContainer;
import org.apache.hadoop.http.FilterInitializer;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import com.google.common.annotations.VisibleForTesting;
public class AmFilterInitializer extends FilterInitializer { public class AmFilterInitializer extends FilterInitializer {
private static final String FILTER_NAME = "AM_PROXY_FILTER"; private static final String FILTER_NAME = "AM_PROXY_FILTER";
private static final String FILTER_CLASS = AmIpFilter.class.getCanonicalName(); private static final String FILTER_CLASS = AmIpFilter.class.getCanonicalName();
public static final String RM_HA_URLS = "RM_HA_URLS";
@Override @Override
public void initFilter(FilterContainer container, Configuration conf) { public void initFilter(FilterContainer container, Configuration conf) {
@ -55,6 +60,29 @@ public class AmFilterInitializer extends FilterInitializer {
sb.setLength(sb.length() - 1); sb.setLength(sb.length() - 1);
params.put(AmIpFilter.PROXY_URI_BASES, sb.toString()); params.put(AmIpFilter.PROXY_URI_BASES, sb.toString());
container.addFilter(FILTER_NAME, FILTER_CLASS, params); container.addFilter(FILTER_NAME, FILTER_CLASS, params);
// Handle RM HA urls
List<String> urls = new ArrayList<>();
// Include yarn-site.xml in the classpath
YarnConfiguration yarnConf = new YarnConfiguration(conf);
for (String rmId : getRmIds(yarnConf)) {
String url = getUrlByRmId(yarnConf, rmId);
urls.add(url);
}
params.put(RM_HA_URLS, StringUtils.join(",", urls));
}
private Collection<String> getRmIds(Configuration conf) {
return conf.getStringCollection(YarnConfiguration.RM_HA_IDS);
}
private String getUrlByRmId(Configuration conf, String rmId) {
String addressPropertyPrefix = YarnConfiguration.useHttps(conf) ?
YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS :
YarnConfiguration.RM_WEBAPP_ADDRESS;
String host = conf.get(HAUtil.addSuffix(addressPropertyPrefix, rmId));
return host;
} }
@VisibleForTesting @VisibleForTesting

View File

@ -18,17 +18,12 @@
package org.apache.hadoop.yarn.server.webproxy.amfilter; package org.apache.hadoop.yarn.server.webproxy.amfilter;
import java.io.IOException; import com.google.common.annotations.VisibleForTesting;
import java.net.InetAddress; import org.apache.hadoop.classification.InterfaceAudience.Public;
import java.net.MalformedURLException; import org.apache.hadoop.yarn.server.webproxy.ProxyUtils;
import java.net.URL; import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet;
import java.net.UnknownHostException; import org.slf4j.Logger;
import java.net.HttpURLConnection; import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.Collection;
import javax.servlet.Filter; import javax.servlet.Filter;
import javax.servlet.FilterChain; import javax.servlet.FilterChain;
@ -39,15 +34,16 @@ import javax.servlet.ServletResponse;
import javax.servlet.http.Cookie; import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import com.google.common.annotations.VisibleForTesting; import java.net.HttpURLConnection;
import org.apache.hadoop.classification.InterfaceAudience.Public; import java.net.InetAddress;
import org.apache.hadoop.yarn.conf.HAUtil; import java.net.MalformedURLException;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import java.net.URL;
import org.apache.hadoop.yarn.server.webproxy.ProxyUtils; import java.net.UnknownHostException;
import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet; import java.util.HashMap;
import org.slf4j.Logger; import java.util.HashSet;
import org.slf4j.LoggerFactory; import java.util.Map;
import java.util.Set;
@Public @Public
public class AmIpFilter implements Filter { public class AmIpFilter implements Filter {
@ -70,6 +66,7 @@ public class AmIpFilter implements Filter {
private long lastUpdate; private long lastUpdate;
@VisibleForTesting @VisibleForTesting
Map<String, String> proxyUriBases; Map<String, String> proxyUriBases;
String rmUrls[] = null;
@Override @Override
public void init(FilterConfig conf) throws ServletException { public void init(FilterConfig conf) throws ServletException {
@ -95,6 +92,10 @@ public class AmIpFilter implements Filter {
} }
} }
} }
if (conf.getInitParameter(AmFilterInitializer.RM_HA_URLS) != null) {
rmUrls = conf.getInitParameter(AmFilterInitializer.RM_HA_URLS).split(",");
}
} }
protected Set<String> getProxyAddresses() throws ServletException { protected Set<String> getProxyAddresses() throws ServletException {
@ -196,13 +197,11 @@ public class AmIpFilter implements Filter {
if (proxyUriBases.size() == 1) { if (proxyUriBases.size() == 1) {
// external proxy or not RM HA // external proxy or not RM HA
addr = proxyUriBases.values().iterator().next(); addr = proxyUriBases.values().iterator().next();
} else { } else if (rmUrls != null) {
// RM HA for (String url : rmUrls) {
YarnConfiguration conf = new YarnConfiguration(); String host = proxyUriBases.get(url);
for (String rmId : getRmIds(conf)) { if (isValidUrl(host)) {
String url = getUrlByRmId(conf, rmId); addr = host;
if (isValidUrl(url)) {
addr = url;
break; break;
} }
} }
@ -215,26 +214,12 @@ public class AmIpFilter implements Filter {
return addr; return addr;
} }
@VisibleForTesting
Collection<String> getRmIds(YarnConfiguration conf) {
return conf.getStringCollection(YarnConfiguration.RM_HA_IDS);
}
@VisibleForTesting
String getUrlByRmId(YarnConfiguration conf, String rmId) {
String addressPropertyPrefix = YarnConfiguration.useHttps(conf) ?
YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS :
YarnConfiguration.RM_WEBAPP_ADDRESS;
String host = conf.get(HAUtil.addSuffix(addressPropertyPrefix, rmId));
return proxyUriBases.get(host);
}
@VisibleForTesting @VisibleForTesting
public boolean isValidUrl(String url) { public boolean isValidUrl(String url) {
boolean isValid = false; boolean isValid = false;
try { try {
HttpURLConnection conn = HttpURLConnection conn = (HttpURLConnection) new URL(url)
(HttpURLConnection) new URL(url).openConnection(); .openConnection();
conn.connect(); conn.connect();
isValid = conn.getResponseCode() == HttpURLConnection.HTTP_OK; isValid = conn.getResponseCode() == HttpURLConnection.HTTP_OK;
} catch (Exception e) { } catch (Exception e) {

View File

@ -152,14 +152,7 @@ public class TestAmFilter {
spy.proxyUriBases = new HashMap<>(); spy.proxyUriBases = new HashMap<>();
spy.proxyUriBases.put(rm1, rm1Url); spy.proxyUriBases.put(rm1, rm1Url);
spy.proxyUriBases.put(rm2, rm2Url); spy.proxyUriBases.put(rm2, rm2Url);
spy.rmUrls = new String[] { rm1, rm2 };
Collection<String> rmIds = new ArrayList<>(Arrays.asList(rm1, rm2));
Mockito.doReturn(rmIds).when(spy).getRmIds(
Mockito.any(YarnConfiguration.class));
Mockito.doReturn(rm1Url).when(spy).getUrlByRmId(
Mockito.any(YarnConfiguration.class), Mockito.eq(rm2));
Mockito.doReturn(rm2Url).when(spy)
.getUrlByRmId(Mockito.any(YarnConfiguration.class), Mockito.eq(rm1));
// Stub "isValidUrl" and returns false for rm1, returns true for rm2. // Stub "isValidUrl" and returns false for rm1, returns true for rm2.
Mockito.doReturn(false).when(spy).isValidUrl(Mockito.eq(rm1Url)); Mockito.doReturn(false).when(spy).isValidUrl(Mockito.eq(rm1Url));

View File

@ -53,10 +53,11 @@ public class TestAmFilterInitializer extends TestCase {
AmFilterInitializer afi = new MockAmFilterInitializer(); AmFilterInitializer afi = new MockAmFilterInitializer();
assertNull(con.givenParameters); assertNull(con.givenParameters);
afi.initFilter(con, conf); afi.initFilter(con, conf);
assertEquals(2, con.givenParameters.size()); assertEquals(3, con.givenParameters.size());
assertEquals("host1", con.givenParameters.get(AmIpFilter.PROXY_HOSTS)); assertEquals("host1", con.givenParameters.get(AmIpFilter.PROXY_HOSTS));
assertEquals("http://host1:1000/foo", assertEquals("http://host1:1000/foo",
con.givenParameters.get(AmIpFilter.PROXY_URI_BASES)); con.givenParameters.get(AmIpFilter.PROXY_URI_BASES));
assertEquals("", con.givenParameters.get(AmFilterInitializer.RM_HA_URLS));
// Check a single RM_WEBAPP_ADDRESS // Check a single RM_WEBAPP_ADDRESS
con = new MockFilterContainer(); con = new MockFilterContainer();
@ -65,10 +66,11 @@ public class TestAmFilterInitializer extends TestCase {
afi = new MockAmFilterInitializer(); afi = new MockAmFilterInitializer();
assertNull(con.givenParameters); assertNull(con.givenParameters);
afi.initFilter(con, conf); afi.initFilter(con, conf);
assertEquals(2, con.givenParameters.size()); assertEquals(3, con.givenParameters.size());
assertEquals("host2", con.givenParameters.get(AmIpFilter.PROXY_HOSTS)); assertEquals("host2", con.givenParameters.get(AmIpFilter.PROXY_HOSTS));
assertEquals("http://host2:2000/foo", assertEquals("http://host2:2000/foo",
con.givenParameters.get(AmIpFilter.PROXY_URI_BASES)); con.givenParameters.get(AmIpFilter.PROXY_URI_BASES));
assertEquals("", con.givenParameters.get(AmFilterInitializer.RM_HA_URLS));
// Check multiple RM_WEBAPP_ADDRESSes (RM HA) // Check multiple RM_WEBAPP_ADDRESSes (RM HA)
con = new MockFilterContainer(); con = new MockFilterContainer();
@ -81,7 +83,7 @@ public class TestAmFilterInitializer extends TestCase {
afi = new MockAmFilterInitializer(); afi = new MockAmFilterInitializer();
assertNull(con.givenParameters); assertNull(con.givenParameters);
afi.initFilter(con, conf); afi.initFilter(con, conf);
assertEquals(2, con.givenParameters.size()); assertEquals(3, con.givenParameters.size());
String[] proxyHosts = con.givenParameters.get(AmIpFilter.PROXY_HOSTS) String[] proxyHosts = con.givenParameters.get(AmIpFilter.PROXY_HOSTS)
.split(AmIpFilter.PROXY_HOSTS_DELIMITER); .split(AmIpFilter.PROXY_HOSTS_DELIMITER);
assertEquals(3, proxyHosts.length); assertEquals(3, proxyHosts.length);
@ -96,6 +98,8 @@ public class TestAmFilterInitializer extends TestCase {
assertEquals("http://host2:2000/foo", proxyBases[0]); assertEquals("http://host2:2000/foo", proxyBases[0]);
assertEquals("http://host3:3000/foo", proxyBases[1]); assertEquals("http://host3:3000/foo", proxyBases[1]);
assertEquals("http://host4:4000/foo", proxyBases[2]); assertEquals("http://host4:4000/foo", proxyBases[2]);
assertEquals("host2:2000,host3:3000,host4:4000",
con.givenParameters.get(AmFilterInitializer.RM_HA_URLS));
// Check multiple RM_WEBAPP_ADDRESSes (RM HA) with HTTPS // Check multiple RM_WEBAPP_ADDRESSes (RM HA) with HTTPS
con = new MockFilterContainer(); con = new MockFilterContainer();
@ -109,7 +113,7 @@ public class TestAmFilterInitializer extends TestCase {
afi = new MockAmFilterInitializer(); afi = new MockAmFilterInitializer();
assertNull(con.givenParameters); assertNull(con.givenParameters);
afi.initFilter(con, conf); afi.initFilter(con, conf);
assertEquals(2, con.givenParameters.size()); assertEquals(3, con.givenParameters.size());
proxyHosts = con.givenParameters.get(AmIpFilter.PROXY_HOSTS) proxyHosts = con.givenParameters.get(AmIpFilter.PROXY_HOSTS)
.split(AmIpFilter.PROXY_HOSTS_DELIMITER); .split(AmIpFilter.PROXY_HOSTS_DELIMITER);
assertEquals(2, proxyHosts.length); assertEquals(2, proxyHosts.length);
@ -122,6 +126,8 @@ public class TestAmFilterInitializer extends TestCase {
Arrays.sort(proxyBases); Arrays.sort(proxyBases);
assertEquals("https://host5:5000/foo", proxyBases[0]); assertEquals("https://host5:5000/foo", proxyBases[0]);
assertEquals("https://host6:6000/foo", proxyBases[1]); assertEquals("https://host6:6000/foo", proxyBases[1]);
assertEquals("host5:5000,host6:6000",
con.givenParameters.get(AmFilterInitializer.RM_HA_URLS));
} }
@Test @Test