YARN-9581. Fixed yarn logs cli to access RM2.

Contributed by Prabhu Joseph
This commit is contained in:
Eric Yang 2019-06-06 16:41:58 -04:00
parent 8ca58efeec
commit cb9bc6e64c
5 changed files with 108 additions and 38 deletions

View File

@ -25,6 +25,7 @@ import com.sun.jersey.api.client.ClientRequest;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.UniformInterfaceException;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.WebResource.Builder;
import com.sun.jersey.api.client.filter.ClientFilter;
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
@ -157,6 +158,9 @@ public class LogsCLI extends Configured implements Tool {
if (yarnClient != null) {
yarnClient.close();
}
if (webServiceClient != null) {
webServiceClient.destroy();
}
}
}
@ -420,24 +424,34 @@ public class LogsCLI extends Configured implements Tool {
}
protected List<JSONObject> getAMContainerInfoForRMWebService(
Configuration conf, String appId) throws ClientHandlerException,
Configuration conf, String appId) throws Exception {
return WebAppUtils.execOnActiveRM(conf, this::getAMContainerInfoFromRM,
appId);
}
private List<JSONObject> getAMContainerInfoFromRM(
String webAppAddress, String appId) throws ClientHandlerException,
UniformInterfaceException, JSONException {
String webAppAddress = WebAppUtils.getRMWebAppURLWithScheme(conf);
WebResource webResource = webServiceClient.resource(webAppAddress);
ClientResponse response =
webResource.path("ws").path("v1").path("cluster").path("apps")
.path(appId).path("appattempts").accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
JSONObject json =
response.getEntity(JSONObject.class).getJSONObject("appAttempts");
JSONArray requests = json.getJSONArray("appAttempt");
List<JSONObject> amContainersList = new ArrayList<JSONObject>();
for (int i = 0; i < requests.length(); i++) {
amContainersList.add(requests.getJSONObject(i));
ClientResponse response = null;
try {
Builder builder = webServiceClient.resource(webAppAddress)
.path("ws").path("v1").path("cluster")
.path("apps").path(appId).path("appattempts")
.accept(MediaType.APPLICATION_JSON);
response = builder.get(ClientResponse.class);
JSONObject json = response.getEntity(JSONObject.class)
.getJSONObject("appAttempts");
JSONArray requests = json.getJSONArray("appAttempt");
for (int j = 0; j < requests.length(); j++) {
amContainersList.add(requests.getJSONObject(j));
}
return amContainersList;
} finally {
if (response != null) {
response.close();
}
}
}
private List<JSONObject> getAMContainerInfoForAHSWebService(

View File

@ -21,13 +21,14 @@ package org.apache.hadoop.yarn.client.cli;
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.WebResource.Builder;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.MissingArgumentException;
import org.apache.commons.cli.Options;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -131,18 +132,22 @@ public class SchedConfCLI extends Configured implements Tool {
return -1;
}
Client webServiceClient = Client.create();
WebResource webResource = webServiceClient
.resource(WebAppUtils.getRMWebAppURLWithScheme(getConf()));
ClientResponse response = null;
Configuration conf = getConf();
return WebAppUtils.execOnActiveRM(conf,
this::updateSchedulerConfOnRMNode, updateInfo);
}
private int updateSchedulerConfOnRMNode(String webAppAddress,
SchedConfUpdateInfo updateInfo) throws Exception {
Client webServiceClient = Client.create();
ClientResponse response = null;
try {
response =
webResource.path("ws").path("v1").path("cluster")
.path("scheduler-conf").accept(MediaType.APPLICATION_JSON)
.entity(YarnWebServiceUtils.toJson(updateInfo,
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
.put(ClientResponse.class);
Builder builder = webServiceClient.resource(webAppAddress)
.path("ws").path("v1").path("cluster")
.path("scheduler-conf").accept(MediaType.APPLICATION_JSON);
builder.entity(YarnWebServiceUtils.toJson(updateInfo,
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON);
response = builder.put(ClientResponse.class);
if (response != null) {
if (response.getStatus() == Status.OK.getStatusCode()) {
System.out.println("Configuration changed successfully.");
@ -163,6 +168,7 @@ public class SchedConfCLI extends Configured implements Tool {
}
}
@VisibleForTesting
void addQueues(String args, SchedConfUpdateInfo updateInfo) {
if (args == null) {

View File

@ -90,8 +90,33 @@ public class WebAppUtils {
}
}
/**
* Runs a certain function against the active RM. The function's first
* argument is expected to be a string which contains the address of
* the RM being tried.
*/
public static <T, R> R execOnActiveRM(Configuration conf,
ThrowingBiFunction<String, T, R> func, T arg) throws Exception {
String rm1Address = getRMWebAppURLWithScheme(conf, 0);
try {
return func.apply(rm1Address, arg);
} catch (Exception e) {
if (HAUtil.isHAEnabled(conf)) {
String rm2Address = getRMWebAppURLWithScheme(conf, 1);
return func.apply(rm2Address, arg);
}
throw e;
}
}
/** A BiFunction which throws on Exception. */
@FunctionalInterface
public interface ThrowingBiFunction<T, U, R> {
R apply(T t, U u) throws Exception;
}
public static String getRMWebAppURLWithoutScheme(Configuration conf,
boolean isHAEnabled) {
boolean isHAEnabled, int haIdIndex) {
YarnConfiguration yarnConfig = new YarnConfiguration(conf);
// set RM_ID if we have not configure it.
if (isHAEnabled) {
@ -99,7 +124,7 @@ public class WebAppUtils {
if (rmId == null || rmId.isEmpty()) {
List<String> rmIds = new ArrayList<>(HAUtil.getRMHAIds(conf));
if (rmIds != null && !rmIds.isEmpty()) {
yarnConfig.set(YarnConfiguration.RM_HA_ID, rmIds.get(0));
yarnConfig.set(YarnConfiguration.RM_HA_ID, rmIds.get(haIdIndex));
}
}
}
@ -120,13 +145,19 @@ public class WebAppUtils {
}
}
public static String getRMWebAppURLWithScheme(Configuration conf,
int haIdIndex) {
return getHttpSchemePrefix(conf) + getRMWebAppURLWithoutScheme(
conf, HAUtil.isHAEnabled(conf), haIdIndex);
}
public static String getRMWebAppURLWithScheme(Configuration conf) {
return getHttpSchemePrefix(conf) + getRMWebAppURLWithoutScheme(
conf, HAUtil.isHAEnabled(conf));
conf, HAUtil.isHAEnabled(conf), 0);
}
public static String getRMWebAppURLWithoutScheme(Configuration conf) {
return getRMWebAppURLWithoutScheme(conf, false);
return getRMWebAppURLWithoutScheme(conf, false, 0);
}
public static String getRouterWebAppURLWithScheme(Configuration conf) {

View File

@ -21,7 +21,7 @@ import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.UniformInterfaceException;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.WebResource.Builder;
import javax.ws.rs.core.MediaType;
import com.sun.jersey.api.json.JSONJAXBContext;
@ -53,16 +53,29 @@ public final class YarnWebServiceUtils {
public static JSONObject getNodeInfoFromRMWebService(Configuration conf,
String nodeId) throws ClientHandlerException,
UniformInterfaceException {
try {
return WebAppUtils.execOnActiveRM(conf,
YarnWebServiceUtils::getNodeInfoFromRM, nodeId);
} catch (Exception e) {
if (e instanceof ClientHandlerException) {
throw ((ClientHandlerException) e);
} else if (e instanceof UniformInterfaceException) {
throw ((UniformInterfaceException) e);
} else {
throw new RuntimeException(e);
}
}
}
private static JSONObject getNodeInfoFromRM(String webAppAddress,
String nodeId) throws ClientHandlerException, UniformInterfaceException {
Client webServiceClient = Client.create();
String webAppAddress = WebAppUtils.getRMWebAppURLWithScheme(conf);
WebResource webResource = webServiceClient.resource(webAppAddress);
ClientResponse response = null;
try {
response = webResource.path("ws").path("v1").path("cluster")
.path("nodes").path(nodeId).accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
Builder builder = webServiceClient.resource(webAppAddress)
.path("ws").path("v1").path("cluster")
.path("nodes").path(nodeId).accept(MediaType.APPLICATION_JSON);
response = builder.get(ClientResponse.class);
return response.getEntity(JSONObject.class);
} finally {
if (response != null) {

View File

@ -56,6 +56,12 @@ public class TestYarnConfiguration {
conf2.set("yarn.resourcemanager.hostname.rm2", "40.40.40.40");
String rmWebUrlinHA2 = WebAppUtils.getRMWebAppURLWithScheme(conf2);
Assert.assertEquals("http://30.30.30.30:8088", rmWebUrlinHA2);
rmWebUrlinHA2 = WebAppUtils.getRMWebAppURLWithScheme(conf2, 0);
Assert.assertEquals("http://30.30.30.30:8088", rmWebUrlinHA2);
rmWebUrlinHA2 = WebAppUtils.getRMWebAppURLWithScheme(conf2, 1);
Assert.assertEquals("http://40.40.40.40:8088", rmWebUrlinHA2);
}
@Test