YARN-4232. TopCLI console support for HA mode. Contributed by Bibin A Chundatt

This commit is contained in:
Naganarasimha 2016-09-17 09:52:39 +05:30
parent 501a77856d
commit ade7c2bc9c
3 changed files with 224 additions and 31 deletions

View File

@ -20,10 +20,13 @@ package org.apache.hadoop.yarn.client.cli;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.ConnectException;
import java.net.MalformedURLException;
import java.net.URL; import java.net.URL;
import java.net.URLConnection; import java.net.URLConnection;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.EnumMap; import java.util.EnumMap;
@ -37,6 +40,10 @@ import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLSocketFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache; import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheBuilder;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
@ -50,7 +57,12 @@ import org.apache.commons.lang.time.DateFormatUtils;
import org.apache.commons.lang.time.DurationFormatUtils; import org.apache.commons.lang.time.DurationFormatUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpConfig.Policy;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
@ -60,12 +72,17 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueStatistics; import org.apache.hadoop.yarn.api.records.QueueStatistics;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject; import org.codehaus.jettison.json.JSONObject;
public class TopCLI extends YarnCLI { public class TopCLI extends YarnCLI {
private static final String CLUSTER_INFO_URL = "/ws/v1/cluster/info";
private static final Log LOG = LogFactory.getLog(TopCLI.class); private static final Log LOG = LogFactory.getLog(TopCLI.class);
private String CLEAR = "\u001b[2J"; private String CLEAR = "\u001b[2J";
private String CLEAR_LINE = "\u001b[2K"; private String CLEAR_LINE = "\u001b[2K";
@ -742,18 +759,12 @@ public class TopCLI extends YarnCLI {
long getRMStartTime() { long getRMStartTime() {
try { try {
URL url = // connect with url
new URL("http://" URL url = getClusterUrl();
+ client.getConfig().get(YarnConfiguration.RM_WEBAPP_ADDRESS) if (null == url) {
+ "/ws/v1/cluster/info"); return -1;
URLConnection conn = url.openConnection(); }
conn.connect(); JSONObject clusterInfo = getJSONObject(connect(url));
InputStream in = conn.getInputStream();
String encoding = conn.getContentEncoding();
encoding = encoding == null ? "UTF-8" : encoding;
String body = IOUtils.toString(in, encoding);
JSONObject obj = new JSONObject(body);
JSONObject clusterInfo = obj.getJSONObject("clusterInfo");
return clusterInfo.getLong("startedOn"); return clusterInfo.getLong("startedOn");
} catch (Exception e) { } catch (Exception e) {
LOG.error("Could not fetch RM start time", e); LOG.error("Could not fetch RM start time", e);
@ -761,6 +772,80 @@ public class TopCLI extends YarnCLI {
return -1; return -1;
} }
private JSONObject getJSONObject(URLConnection conn)
throws IOException, JSONException {
InputStream in = conn.getInputStream();
String encoding = conn.getContentEncoding();
encoding = encoding == null ? "UTF-8" : encoding;
String body = IOUtils.toString(in, encoding);
JSONObject obj = new JSONObject(body);
JSONObject clusterInfo = obj.getJSONObject("clusterInfo");
return clusterInfo;
}
private URL getClusterUrl() throws Exception {
URL url = null;
Configuration conf = getConf();
if (HAUtil.isHAEnabled(conf)) {
Collection<String> haids = HAUtil.getRMHAIds(conf);
for (String rmhid : haids) {
try {
url = getHAClusterUrl(conf, rmhid);
if (isActive(url)) {
break;
}
} catch (ConnectException e) {
// ignore and try second one when one of RM is down
}
}
} else {
url = new URL(
WebAppUtils.getRMWebAppURLWithScheme(conf) + CLUSTER_INFO_URL);
}
return url;
}
private boolean isActive(URL url) throws Exception {
URLConnection connect = connect(url);
JSONObject clusterInfo = getJSONObject(connect);
return clusterInfo.getString("haState").equals("ACTIVE");
}
@VisibleForTesting
public URL getHAClusterUrl(Configuration conf, String rmhid)
throws MalformedURLException {
return new URL(WebAppUtils.getHttpSchemePrefix(conf)
+ WebAppUtils.getResolvedRemoteRMWebAppURLWithoutScheme(conf,
YarnConfiguration.useHttps(conf) ? Policy.HTTPS_ONLY
: Policy.HTTP_ONLY,
rmhid)
+ CLUSTER_INFO_URL);
}
private URLConnection connect(URL url) throws Exception {
AuthenticatedURL.Token token = new AuthenticatedURL.Token();
AuthenticatedURL authUrl;
SSLFactory clientSslFactory;
URLConnection connection;
// If https is chosen, configures SSL client.
if (YarnConfiguration.useHttps(getConf())) {
clientSslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, getConf());
clientSslFactory.init();
SSLSocketFactory sslSocktFact = clientSslFactory.createSSLSocketFactory();
authUrl =
new AuthenticatedURL(new KerberosAuthenticator(), clientSslFactory);
connection = authUrl.openConnection(url, token);
HttpsURLConnection httpsConn = (HttpsURLConnection) connection;
httpsConn.setSSLSocketFactory(sslSocktFact);
} else {
authUrl = new AuthenticatedURL(new KerberosAuthenticator());
connection = authUrl.openConnection(url, token);
}
connection.connect();
return connection;
}
String getHeader(QueueMetrics queueMetrics, NodesInformation nodes) { String getHeader(QueueMetrics queueMetrics, NodesInformation nodes) {
StringBuilder ret = new StringBuilder(); StringBuilder ret = new StringBuilder();
String queue = "root"; String queue = "root";
@ -768,7 +853,10 @@ public class TopCLI extends YarnCLI {
queue = StringUtils.join(queues, ","); queue = StringUtils.join(queues, ",");
} }
long now = Time.now(); long now = Time.now();
long uptime = now - rmStartTime; long uptime = 0L;
if (rmStartTime != -1) {
uptime = now - rmStartTime;
}
long days = TimeUnit.MILLISECONDS.toDays(uptime); long days = TimeUnit.MILLISECONDS.toDays(uptime);
long hours = long hours =
TimeUnit.MILLISECONDS.toHours(uptime) TimeUnit.MILLISECONDS.toHours(uptime)

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.cli;
import java.io.IOException;
import java.net.URL;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Test class for TopCli.
*
*/
public class TestTopCLI {
private static final String RM1_NODE_ID = "rm1";
private static final String RM2_NODE_ID = "rm2";
private static List<String> dummyHostNames =
Arrays.asList("host1", "host2", "host3");
private static Map<String, String> savedStaticResolution = new HashMap<>();
@BeforeClass
public static void initializeDummyHostnameResolution() throws Exception {
String previousIpAddress;
for (String hostName : dummyHostNames) {
previousIpAddress = NetUtils.getStaticResolution(hostName);
if (null != previousIpAddress) {
savedStaticResolution.put(hostName, previousIpAddress);
}
NetUtils.addStaticResolution(hostName, "10.20.30.40");
}
}
@AfterClass
public static void restoreDummyHostnameResolution() throws Exception {
for (Map.Entry<String, String> hostnameToIpEntry : savedStaticResolution
.entrySet()) {
NetUtils.addStaticResolution(hostnameToIpEntry.getKey(),
hostnameToIpEntry.getValue());
}
}
@Test
public void testHAClusterInfoURL() throws IOException, InterruptedException {
TopCLI topcli = new TopCLI();
// http
String rm1Address = "host2:8088";
String rm2Address = "host3:8088";
Configuration conf = topcli.getConf();
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS + "." + RM1_NODE_ID,
rm1Address);
topcli.getConf().set(
YarnConfiguration.RM_WEBAPP_ADDRESS + "." + RM2_NODE_ID, rm2Address);
topcli.getConf().setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
topcli.getConf().set(YarnConfiguration.RM_HA_IDS,
RM1_NODE_ID + "," + RM2_NODE_ID);
URL clusterUrl = topcli.getHAClusterUrl(conf, RM1_NODE_ID);
Assert.assertEquals("http", clusterUrl.getProtocol());
Assert.assertEquals(rm1Address, clusterUrl.getAuthority());
clusterUrl = topcli.getHAClusterUrl(conf, RM2_NODE_ID);
Assert.assertEquals("http", clusterUrl.getProtocol());
Assert.assertEquals(rm2Address, clusterUrl.getAuthority());
// https
rm1Address = "host2:9088";
rm2Address = "host3:9088";
conf = topcli.getConf();
conf.set(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS + "." + RM1_NODE_ID,
rm1Address);
conf.set(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS + "." + RM2_NODE_ID,
rm2Address);
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
conf.set(YarnConfiguration.YARN_HTTP_POLICY_KEY, "HTTPS_ONLY");
clusterUrl = topcli.getHAClusterUrl(conf, RM1_NODE_ID);
Assert.assertEquals("https", clusterUrl.getProtocol());
Assert.assertEquals(rm1Address, clusterUrl.getAuthority());
}
}

View File

@ -40,7 +40,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.RMHAUtils; import org.apache.hadoop.yarn.util.RMHAUtils;
import org.apache.hadoop.yarn.webapp.BadRequestException; import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.NotFoundException; import org.apache.hadoop.yarn.webapp.NotFoundException;
@ -183,32 +182,32 @@ public class WebAppUtils {
public static String getResolvedRemoteRMWebAppURLWithoutScheme(Configuration conf, public static String getResolvedRemoteRMWebAppURLWithoutScheme(Configuration conf,
Policy httpPolicy) { Policy httpPolicy) {
InetSocketAddress address = null;
String rmId = null; String rmId = null;
if (HAUtil.isHAEnabled(conf)) { if (HAUtil.isHAEnabled(conf)) {
// If HA enabled, pick one of the RM-IDs and rely on redirect to go to // If HA enabled, pick one of the RM-IDs and rely on redirect to go to
// the Active RM // the Active RM
rmId = (String) HAUtil.getRMHAIds(conf).toArray()[0]; rmId = (String) HAUtil.getRMHAIds(conf).toArray()[0];
} }
return getResolvedRemoteRMWebAppURLWithoutScheme(conf, httpPolicy, rmId);
}
public static String getResolvedRemoteRMWebAppURLWithoutScheme(
Configuration conf, Policy httpPolicy, String rmId) {
InetSocketAddress address = null;
if (httpPolicy == Policy.HTTPS_ONLY) { if (httpPolicy == Policy.HTTPS_ONLY) {
address = address = conf.getSocketAddr(
conf.getSocketAddr( rmId == null ? YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS
rmId == null : HAUtil.addSuffix(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS,
? YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS rmId),
: HAUtil.addSuffix( YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_ADDRESS,
YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, rmId), YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT);
YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_ADDRESS,
YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT);
} else { } else {
address = address = conf.getSocketAddr(
conf.getSocketAddr( rmId == null ? YarnConfiguration.RM_WEBAPP_ADDRESS
rmId == null : HAUtil.addSuffix(YarnConfiguration.RM_WEBAPP_ADDRESS, rmId),
? YarnConfiguration.RM_WEBAPP_ADDRESS YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS,
: HAUtil.addSuffix( YarnConfiguration.DEFAULT_RM_WEBAPP_PORT);
YarnConfiguration.RM_WEBAPP_ADDRESS, rmId),
YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_RM_WEBAPP_PORT);
} }
return getResolvedAddress(address); return getResolvedAddress(address);
} }