YARN-9044. LogsCLI should contact ATSv2 for -am option. Contributed by Rohith Sharma K S

This commit is contained in:
Suma Shivaprasad 2018-11-28 00:46:53 -08:00
parent 7dc272199f
commit b3a052d199
2 changed files with 133 additions and 9 deletions

View File

@ -77,6 +77,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -84,6 +85,7 @@ import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
import org.apache.hadoop.yarn.logaggregation.LogToolUtils;
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.apache.hadoop.yarn.webapp.util.YarnWebServiceUtils;
import org.codehaus.jettison.json.JSONArray;
@ -672,16 +674,31 @@ public class LogsCLI extends Configured implements Tool {
+ "and make sure the timeline server is running.");
} else {
try {
amContainersList = getAMContainerInfoForAHSWebService(conf, appId);
if (amContainersList != null && !amContainersList.isEmpty()) {
getAMContainerLists = true;
for (JSONObject amContainer : amContainersList) {
ContainerLogsRequest amRequest = new ContainerLogsRequest(
request);
amRequest.setContainerId(
amContainer.getString("amContainerId"));
requests.add(amRequest);
if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
try {
amContainersList =
getAMContainerInfoFromTimelineReader(conf, appId);
getAMContainerLists =
createContainerLogsRequestForMasterContainer(requests,
request, amContainersList,
AppAttemptMetricsConstants.MASTER_CONTAINER_INFO);
} catch (Exception e) {
System.err.println(
"Unable to get AM container informations from "
+ "TimelineReader for the application:" + appId);
if (YarnConfiguration.timelineServiceV1Enabled(conf)
|| YarnConfiguration.timelineServiceV15Enabled(conf)) {
getAMContainerLists =
getAMContainerInfoForAHSWebService(conf, appId, requests,
request);
} else {
throw e;
}
}
} else {
getAMContainerLists =
getAMContainerInfoForAHSWebService(conf, appId, requests,
request);
}
} catch (Exception e) {
errorMessage.append(e.getMessage());
@ -739,6 +756,76 @@ public class LogsCLI extends Configured implements Tool {
return 0;
}
private boolean getAMContainerInfoForAHSWebService(Configuration conf,
String appId, List<ContainerLogsRequest> requests,
ContainerLogsRequest request) throws JSONException {
List<JSONObject> amContainersList =
getAMContainerInfoForAHSWebService(conf, appId);
return createContainerLogsRequestForMasterContainer(requests, request,
amContainersList, "amContainerId");
}
private boolean createContainerLogsRequestForMasterContainer(
List<ContainerLogsRequest> requests, ContainerLogsRequest request,
List<JSONObject> amContainersList, String masterContainerInfo)
throws JSONException {
boolean getAMContainerLists = false;
if (amContainersList != null && !amContainersList.isEmpty()) {
getAMContainerLists = true;
for (JSONObject amContainer : amContainersList) {
ContainerLogsRequest amRequest = new ContainerLogsRequest(request);
amRequest.setContainerId(amContainer.getString(masterContainerInfo));
requests.add(amRequest);
}
}
return getAMContainerLists;
}
private List<JSONObject> getAMContainerInfoFromTimelineReader(
Configuration conf, String appId)
throws IOException, ClientHandlerException, UniformInterfaceException,
JSONException {
ClientResponse response = getClientResponseFromTimelineReader(conf, appId);
JSONArray appAttemptEntities = response.getEntity(JSONArray.class);
List<JSONObject> amContainersList = new ArrayList<JSONObject>();
for (int i = 0; i < appAttemptEntities.length(); i++) {
JSONObject appAttemptEntity = appAttemptEntities.getJSONObject(i);
JSONObject infoField = appAttemptEntity.getJSONObject("info");
amContainersList.add(infoField);
}
Collections.reverse(amContainersList);
return amContainersList;
}
protected ClientResponse getClientResponseFromTimelineReader(
Configuration conf, String appId) throws IOException {
String webAppAddress = WebAppUtils.getHttpSchemePrefix(conf) + WebAppUtils
.getTimelineReaderWebAppURLWithoutScheme(conf);
WebResource webResource = webServiceClient.resource(webAppAddress);
ClientResponse response =
webResource.path("ws").path("v2").path("timeline").path("clusters")
.path(conf.get(YarnConfiguration.RM_CLUSTER_ID)).path("apps")
.path(appId).path("entities")
.path(TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString())
.queryParam("fields", "INFO").accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
if (response == null
|| response.getStatusInfo().getStatusCode() != ClientResponse.Status.OK
.getStatusCode()) {
String msg =
"Response from the timeline reader server is " + ((response == null) ?
"null" :
"not successful," + " HTTP error code: " + response.getStatus()
+ ", Server response:\n" + response.getEntity(String.class));
System.out.println(msg);
throw new IOException(msg);
}
return response;
}
private void outputAMContainerLogs(ContainerLogsRequest request,
Configuration conf, LogCLIHelpers logCliHelper, boolean useRegex,
boolean ignoreSizeLimit) throws Exception {

View File

@ -82,6 +82,7 @@ import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.junit.Assert;
import org.junit.Before;
@ -181,6 +182,42 @@ public class TestLogsCLI {
.contains("exceeds the number of AM containers"));
}
@Test
public void testAMContainerInfoFetchFromTimelineReader() throws Exception {
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.set(YarnConfiguration.TIMELINE_SERVICE_VERSIONS, "2.0f");
YarnClient mockYarnClient =
createMockYarnClient(YarnApplicationState.FINISHED,
UserGroupInformation.getCurrentUser().getShortUserName());
LogsCLI cli = spy(new LogsCLIForTest(mockYarnClient));
String appInfoEntity =
"[{\"metrics\":[],\"events\":[],\"createdtime\":1542273848613,\"idpref"
+ "ix\":9223372036854775806,\"id\":\"appattempt_1542271570060_0002_"
+ "000001\",\"type\":\"YARN_APPLICATION_ATTEMPT\",\"info\":{\"YARN_"
+ "APPLICATION_ATTEMPT_MASTER_CONTAINER\":\"container_e01_154227157"
+ "0060_0002_01_000001\"},\"configs\":{},\"isrelatedto\":{},\"relat"
+ "esto\":{}}]";
JSONArray obj = new JSONArray(appInfoEntity);
ClientResponse response = mock(ClientResponse.class);
doReturn(obj).when(response).getEntity(JSONArray.class);
doReturn(response).when(cli)
.getClientResponseFromTimelineReader(any(Configuration.class),
any(String.class));
doThrow(new RuntimeException()).when(cli)
.getAMContainerInfoForRMWebService(any(Configuration.class),
any(String.class));
cli.setConf(conf);
int exitCode = cli.run(
new String[] {"-applicationId", "application_1542271570060_0002",
"-am", "1" });
assertTrue(exitCode == 0);
}
@Test(timeout = 5000l)
public void testUnknownApplicationId() throws Exception {
Configuration conf = new YarnConfiguration();