YARN-8303. YarnClient should contact TimelineReader for application/attempt/container report.
(cherry picked from commit ee3355be3c
)
This commit is contained in:
parent
1dd2cf296d
commit
095635d984
|
@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerReport;
|
||||
import org.apache.hadoop.yarn.client.api.impl.AHSClientImpl;
|
||||
import org.apache.hadoop.yarn.client.api.impl.AHSv2ClientImpl;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
|
||||
import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
@ -46,8 +47,13 @@ public abstract class AHSClient extends AbstractService {
|
|||
*/
|
||||
@Public
|
||||
public static AHSClient createAHSClient() {
|
||||
AHSClient client = new AHSClientImpl();
|
||||
return client;
|
||||
return new AHSClientImpl();
|
||||
}
|
||||
|
||||
@InterfaceStability.Evolving
|
||||
@Public
|
||||
public static AHSClient createAHSv2Client() {
|
||||
return new AHSv2ClientImpl();
|
||||
}
|
||||
|
||||
@Private
|
||||
|
|
|
@ -0,0 +1,149 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.client.api.impl;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerReport;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.client.api.AHSClient;
|
||||
import org.apache.hadoop.yarn.client.api.TimelineReaderClient;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.util.timeline.TimelineEntityV2Converter;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* This class provides Application History client implementation which uses
|
||||
* ATS v2 as backend.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public class AHSv2ClientImpl extends AHSClient {
|
||||
private TimelineReaderClient readerClient;
|
||||
|
||||
public AHSv2ClientImpl() {
|
||||
super(AHSv2ClientImpl.class.getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serviceInit(Configuration conf) {
|
||||
readerClient = TimelineReaderClient.createTimelineReaderClient();
|
||||
readerClient.init(conf);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected void setReaderClient(TimelineReaderClient readerClient) {
|
||||
this.readerClient = readerClient;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serviceStart() {
|
||||
readerClient.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serviceStop() {
|
||||
readerClient.stop();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApplicationReport getApplicationReport(ApplicationId appId)
|
||||
throws YarnException, IOException {
|
||||
TimelineEntity entity = readerClient.getApplicationEntity(
|
||||
appId, "ALL", null);
|
||||
return TimelineEntityV2Converter.convertToApplicationReport(entity);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ApplicationReport> getApplications()
|
||||
throws YarnException, IOException {
|
||||
throw new UnsupportedOperationException("ATSv2.0 doesn't support retrieving"
|
||||
+ " ALL application entities.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApplicationAttemptReport getApplicationAttemptReport(
|
||||
ApplicationAttemptId applicationAttemptId)
|
||||
throws YarnException, IOException {
|
||||
TimelineEntity entity = readerClient.getApplicationAttemptEntity(
|
||||
applicationAttemptId, "ALL", null);
|
||||
return TimelineEntityV2Converter.convertToApplicationAttemptReport(entity);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ApplicationAttemptReport> getApplicationAttempts(
|
||||
ApplicationId applicationId) throws YarnException, IOException {
|
||||
List<TimelineEntity> entities = readerClient.getApplicationAttemptEntities(
|
||||
applicationId, "ALL", null, 0, null);
|
||||
List<ApplicationAttemptReport> appAttemptReports =
|
||||
new ArrayList<>();
|
||||
if (entities != null && !entities.isEmpty()) {
|
||||
for (TimelineEntity entity : entities) {
|
||||
ApplicationAttemptReport container =
|
||||
TimelineEntityV2Converter.convertToApplicationAttemptReport(
|
||||
entity);
|
||||
appAttemptReports.add(container);
|
||||
}
|
||||
}
|
||||
return appAttemptReports;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ContainerReport getContainerReport(ContainerId containerId)
|
||||
throws YarnException, IOException {
|
||||
TimelineEntity entity = readerClient.getContainerEntity(containerId,
|
||||
"ALL", null);
|
||||
return TimelineEntityV2Converter.convertToContainerReport(entity);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ContainerReport> getContainers(ApplicationAttemptId
|
||||
applicationAttemptId) throws YarnException, IOException {
|
||||
ApplicationId appId = applicationAttemptId.getApplicationId();
|
||||
Map<String, String> filters = new HashMap<>();
|
||||
filters.put("infofilters", "SYSTEM_INFO_PARENT_ENTITY eq {\"id\":\"" +
|
||||
applicationAttemptId.toString() +
|
||||
"\",\"type\":\"YARN_APPLICATION_ATTEMPT\"}");
|
||||
List<TimelineEntity> entities = readerClient.getContainerEntities(
|
||||
appId, "ALL", filters, 0, null);
|
||||
List<ContainerReport> containers =
|
||||
new ArrayList<>();
|
||||
if (entities != null && !entities.isEmpty()) {
|
||||
for (TimelineEntity entity : entities) {
|
||||
ContainerReport container =
|
||||
TimelineEntityV2Converter.convertToContainerReport(
|
||||
entity);
|
||||
containers.add(container);
|
||||
}
|
||||
}
|
||||
return containers;
|
||||
}
|
||||
}
|
|
@ -144,6 +144,7 @@ public class YarnClientImpl extends YarnClient {
|
|||
private long asyncApiPollIntervalMillis;
|
||||
private long asyncApiPollTimeoutMillis;
|
||||
protected AHSClient historyClient;
|
||||
private AHSClient ahsV2Client;
|
||||
private boolean historyServiceEnabled;
|
||||
protected volatile TimelineClient timelineClient;
|
||||
@VisibleForTesting
|
||||
|
@ -154,6 +155,8 @@ public class YarnClientImpl extends YarnClient {
|
|||
protected boolean timelineServiceBestEffort;
|
||||
private boolean loadResourceTypesFromServer;
|
||||
|
||||
private boolean timelineV2ServiceEnabled;
|
||||
|
||||
private static final String ROOT = "root";
|
||||
|
||||
public YarnClientImpl() {
|
||||
|
@ -183,6 +186,10 @@ public class YarnClientImpl extends YarnClient {
|
|||
timelineService = TimelineUtils.buildTimelineTokenService(conf);
|
||||
}
|
||||
|
||||
if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
|
||||
timelineV2ServiceEnabled = true;
|
||||
}
|
||||
|
||||
// The AHSClientService is enabled by default when we start the
|
||||
// TimelineServer which means we are able to get history information
|
||||
// for applications/applicationAttempts/containers by using ahsClient
|
||||
|
@ -195,6 +202,11 @@ public class YarnClientImpl extends YarnClient {
|
|||
historyClient.init(conf);
|
||||
}
|
||||
|
||||
if (timelineV2ServiceEnabled) {
|
||||
ahsV2Client = AHSClient.createAHSv2Client();
|
||||
ahsV2Client.init(conf);
|
||||
}
|
||||
|
||||
timelineServiceBestEffort = conf.getBoolean(
|
||||
YarnConfiguration.TIMELINE_SERVICE_CLIENT_BEST_EFFORT,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_BEST_EFFORT);
|
||||
|
@ -218,6 +230,9 @@ public class YarnClientImpl extends YarnClient {
|
|||
if (historyServiceEnabled) {
|
||||
historyClient.start();
|
||||
}
|
||||
if (timelineV2ServiceEnabled) {
|
||||
ahsV2Client.start();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new YarnRuntimeException(e);
|
||||
}
|
||||
|
@ -239,6 +254,9 @@ public class YarnClientImpl extends YarnClient {
|
|||
if (historyServiceEnabled) {
|
||||
historyClient.stop();
|
||||
}
|
||||
if (timelineV2ServiceEnabled) {
|
||||
ahsV2Client.stop();
|
||||
}
|
||||
if (timelineClient != null) {
|
||||
timelineClient.stop();
|
||||
}
|
||||
|
@ -511,6 +529,14 @@ public class YarnClientImpl extends YarnClient {
|
|||
request.setApplicationId(appId);
|
||||
response = rmClient.getApplicationReport(request);
|
||||
} catch (ApplicationNotFoundException e) {
|
||||
if (timelineV2ServiceEnabled) {
|
||||
try {
|
||||
return ahsV2Client.getApplicationReport(appId);
|
||||
} catch (Exception ex) {
|
||||
LOG.warn("Failed to fetch application report from "
|
||||
+ "ATS v2", ex);
|
||||
}
|
||||
}
|
||||
if (!historyServiceEnabled) {
|
||||
// Just throw it as usual if historyService is not enabled.
|
||||
throw e;
|
||||
|
@ -721,15 +747,24 @@ public class YarnClientImpl extends YarnClient {
|
|||
.getApplicationAttemptReport(request);
|
||||
return response.getApplicationAttemptReport();
|
||||
} catch (YarnException e) {
|
||||
if (!historyServiceEnabled) {
|
||||
// Just throw it as usual if historyService is not enabled.
|
||||
throw e;
|
||||
}
|
||||
|
||||
// Even if history-service is enabled, treat all exceptions still the same
|
||||
// except the following
|
||||
if (e.getClass() != ApplicationNotFoundException.class) {
|
||||
throw e;
|
||||
}
|
||||
if (timelineV2ServiceEnabled) {
|
||||
try {
|
||||
return ahsV2Client.getApplicationAttemptReport(appAttemptId);
|
||||
} catch (Exception ex) {
|
||||
LOG.warn("Failed to fetch application attempt report from "
|
||||
+ "ATS v2", ex);
|
||||
}
|
||||
}
|
||||
if (!historyServiceEnabled) {
|
||||
// Just throw it as usual if historyService is not enabled.
|
||||
throw e;
|
||||
}
|
||||
return historyClient.getApplicationAttemptReport(appAttemptId);
|
||||
}
|
||||
}
|
||||
|
@ -745,15 +780,23 @@ public class YarnClientImpl extends YarnClient {
|
|||
.getApplicationAttempts(request);
|
||||
return response.getApplicationAttemptList();
|
||||
} catch (YarnException e) {
|
||||
if (!historyServiceEnabled) {
|
||||
// Just throw it as usual if historyService is not enabled.
|
||||
throw e;
|
||||
}
|
||||
// Even if history-service is enabled, treat all exceptions still the same
|
||||
// except the following
|
||||
if (e.getClass() != ApplicationNotFoundException.class) {
|
||||
throw e;
|
||||
}
|
||||
if (timelineV2ServiceEnabled) {
|
||||
try {
|
||||
return ahsV2Client.getApplicationAttempts(appId);
|
||||
} catch (Exception ex) {
|
||||
LOG.warn("Failed to fetch application attempts from "
|
||||
+ "ATS v2", ex);
|
||||
}
|
||||
}
|
||||
if (!historyServiceEnabled) {
|
||||
// Just throw it as usual if historyService is not enabled.
|
||||
throw e;
|
||||
}
|
||||
return historyClient.getApplicationAttempts(appId);
|
||||
}
|
||||
}
|
||||
|
@ -769,16 +812,24 @@ public class YarnClientImpl extends YarnClient {
|
|||
.getContainerReport(request);
|
||||
return response.getContainerReport();
|
||||
} catch (YarnException e) {
|
||||
if (!historyServiceEnabled) {
|
||||
// Just throw it as usual if historyService is not enabled.
|
||||
throw e;
|
||||
}
|
||||
// Even if history-service is enabled, treat all exceptions still the same
|
||||
// except the following
|
||||
if (e.getClass() != ApplicationNotFoundException.class
|
||||
&& e.getClass() != ContainerNotFoundException.class) {
|
||||
throw e;
|
||||
}
|
||||
if (timelineV2ServiceEnabled) {
|
||||
try {
|
||||
return ahsV2Client.getContainerReport(containerId);
|
||||
} catch (Exception ex) {
|
||||
LOG.warn("Failed to fetch container report from "
|
||||
+ "ATS v2", ex);
|
||||
}
|
||||
}
|
||||
if (!historyServiceEnabled) {
|
||||
// Just throw it as usual if historyService is not enabled.
|
||||
throw e;
|
||||
}
|
||||
return historyClient.getContainerReport(containerId);
|
||||
}
|
||||
}
|
||||
|
@ -797,34 +848,30 @@ public class YarnClientImpl extends YarnClient {
|
|||
GetContainersResponse response = rmClient.getContainers(request);
|
||||
containersForAttempt.addAll(response.getContainerList());
|
||||
} catch (YarnException e) {
|
||||
if (e.getClass() != ApplicationNotFoundException.class
|
||||
|| !historyServiceEnabled) {
|
||||
// If Application is not in RM and history service is enabled then we
|
||||
// need to check with history service else throw exception.
|
||||
// Even if history-service is enabled, treat all exceptions still the same
|
||||
// except the following
|
||||
if (e.getClass() != ApplicationNotFoundException.class) {
|
||||
throw e;
|
||||
}
|
||||
if (!historyServiceEnabled && !timelineV2ServiceEnabled) {
|
||||
// if both history server and ATSv2 are not enabled throw exception.
|
||||
throw e;
|
||||
}
|
||||
appNotFoundInRM = true;
|
||||
}
|
||||
|
||||
if (historyServiceEnabled) {
|
||||
// Check with AHS even if found in RM because to capture info of finished
|
||||
// containers also
|
||||
List<ContainerReport> containersListFromAHS = null;
|
||||
try {
|
||||
containersListFromAHS =
|
||||
historyClient.getContainers(applicationAttemptId);
|
||||
getContainerReportFromHistory(applicationAttemptId);
|
||||
} catch (IOException e) {
|
||||
// History service access might be enabled but system metrics publisher
|
||||
// is disabled hence app not found exception is possible
|
||||
if (appNotFoundInRM) {
|
||||
// app not found in bothM and RM then propagate the exception.
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
if (null != containersListFromAHS && containersListFromAHS.size() > 0) {
|
||||
// remove duplicates
|
||||
|
||||
Set<ContainerId> containerIdsToBeKeptFromAHS =
|
||||
new HashSet<ContainerId>();
|
||||
Iterator<ContainerReport> tmpItr = containersListFromAHS.iterator();
|
||||
|
@ -858,10 +905,31 @@ public class YarnClientImpl extends YarnClient {
|
|||
containersForAttempt.addAll(containersListFromAHS);
|
||||
}
|
||||
}
|
||||
}
|
||||
return containersForAttempt;
|
||||
}
|
||||
|
||||
private List<ContainerReport> getContainerReportFromHistory(
|
||||
ApplicationAttemptId applicationAttemptId)
|
||||
throws IOException, YarnException {
|
||||
List<ContainerReport> containersListFromAHS = null;
|
||||
if (timelineV2ServiceEnabled) {
|
||||
try {
|
||||
containersListFromAHS = ahsV2Client.getContainers(applicationAttemptId);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Got an error while fetching container report from ATSv2", e);
|
||||
if (historyServiceEnabled) {
|
||||
containersListFromAHS = historyClient.getContainers(
|
||||
applicationAttemptId);
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
} else if (historyServiceEnabled) {
|
||||
containersListFromAHS = historyClient.getContainers(applicationAttemptId);
|
||||
}
|
||||
return containersListFromAHS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void moveApplicationAcrossQueues(ApplicationId appId,
|
||||
String queue) throws YarnException, IOException {
|
||||
|
|
|
@ -96,6 +96,7 @@ public class LogsCLI extends Configured implements Tool {
|
|||
|
||||
private static final String CONTAINER_ID_OPTION = "containerId";
|
||||
private static final String APPLICATION_ID_OPTION = "applicationId";
|
||||
private static final String CLUSTER_ID_OPTION = "clusterId";
|
||||
private static final String NODE_ADDRESS_OPTION = "nodeAddress";
|
||||
private static final String APP_OWNER_OPTION = "appOwner";
|
||||
private static final String AM_CONTAINER_OPTION = "am";
|
||||
|
@ -134,7 +135,6 @@ public class LogsCLI extends Configured implements Tool {
|
|||
@Override
|
||||
public int run(String[] args) throws Exception {
|
||||
try {
|
||||
yarnClient = createYarnClient();
|
||||
webServiceClient = new Client(new URLConnectionClientHandler(
|
||||
new HttpURLConnectionFactory() {
|
||||
@Override
|
||||
|
@ -171,6 +171,7 @@ public class LogsCLI extends Configured implements Tool {
|
|||
}
|
||||
CommandLineParser parser = new GnuParser();
|
||||
String appIdStr = null;
|
||||
String clusterIdStr = null;
|
||||
String containerIdStr = null;
|
||||
String nodeAddress = null;
|
||||
String appOwner = null;
|
||||
|
@ -207,6 +208,10 @@ public class LogsCLI extends Configured implements Tool {
|
|||
return -1;
|
||||
}
|
||||
}
|
||||
if (commandLine.hasOption(CLUSTER_ID_OPTION)) {
|
||||
clusterIdStr = commandLine.getOptionValue(CLUSTER_ID_OPTION);
|
||||
getConf().set(YarnConfiguration.RM_CLUSTER_ID, clusterIdStr);
|
||||
}
|
||||
if (commandLine.hasOption(PER_CONTAINER_LOG_FILES_OPTION)) {
|
||||
logFiles = commandLine.getOptionValues(PER_CONTAINER_LOG_FILES_OPTION);
|
||||
}
|
||||
|
@ -303,6 +308,8 @@ public class LogsCLI extends Configured implements Tool {
|
|||
LogCLIHelpers logCliHelper = new LogCLIHelpers();
|
||||
logCliHelper.setConf(getConf());
|
||||
|
||||
yarnClient = createYarnClient();
|
||||
|
||||
YarnApplicationState appState = YarnApplicationState.NEW;
|
||||
ApplicationReport appReport = null;
|
||||
try {
|
||||
|
@ -824,6 +831,8 @@ public class LogsCLI extends Configured implements Tool {
|
|||
+ "By default, it will print all available logs."
|
||||
+ " Work with -log_files to get only specific logs. If specified, the"
|
||||
+ " applicationId can be omitted");
|
||||
opts.addOption(CLUSTER_ID_OPTION, true, "ClusterId. "
|
||||
+ "By default, it will take default cluster id from the RM");
|
||||
opts.addOption(NODE_ADDRESS_OPTION, true, "NodeAddress in the format "
|
||||
+ "nodename:port");
|
||||
opts.addOption(APP_OWNER_OPTION, true,
|
||||
|
@ -892,6 +901,7 @@ public class LogsCLI extends Configured implements Tool {
|
|||
+ "and fetch all logs.");
|
||||
opts.getOption(APPLICATION_ID_OPTION).setArgName("Application ID");
|
||||
opts.getOption(CONTAINER_ID_OPTION).setArgName("Container ID");
|
||||
opts.getOption(CLUSTER_ID_OPTION).setArgName("Cluster ID");
|
||||
opts.getOption(NODE_ADDRESS_OPTION).setArgName("Node Address");
|
||||
opts.getOption(APP_OWNER_OPTION).setArgName("Application Owner");
|
||||
opts.getOption(AM_CONTAINER_OPTION).setArgName("AM Containers");
|
||||
|
@ -913,6 +923,7 @@ public class LogsCLI extends Configured implements Tool {
|
|||
Options printOpts = new Options();
|
||||
printOpts.addOption(commandOpts.getOption(HELP_CMD));
|
||||
printOpts.addOption(commandOpts.getOption(CONTAINER_ID_OPTION));
|
||||
printOpts.addOption(commandOpts.getOption(CLUSTER_ID_OPTION));
|
||||
printOpts.addOption(commandOpts.getOption(NODE_ADDRESS_OPTION));
|
||||
printOpts.addOption(commandOpts.getOption(APP_OWNER_OPTION));
|
||||
printOpts.addOption(commandOpts.getOption(AM_CONTAINER_OPTION));
|
||||
|
|
|
@ -0,0 +1,240 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.client.api.impl;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerReport;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
||||
import org.apache.hadoop.yarn.client.api.TimelineReaderClient;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
|
||||
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
|
||||
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* This class is to test class {@link AHSv2ClientImpl).
|
||||
*/
|
||||
public class TestAHSv2ClientImpl {
|
||||
|
||||
private AHSv2ClientImpl client;
|
||||
private TimelineReaderClient spyTimelineReaderClient;
|
||||
@Before
|
||||
public void setup() {
|
||||
Configuration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
||||
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
|
||||
client = new AHSv2ClientImpl();
|
||||
spyTimelineReaderClient = mock(TimelineReaderClient.class);
|
||||
client.setReaderClient(spyTimelineReaderClient);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetContainerReport() throws IOException, YarnException {
|
||||
final ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||
final ApplicationAttemptId appAttemptId =
|
||||
ApplicationAttemptId.newInstance(appId, 1);
|
||||
final ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1);
|
||||
when(spyTimelineReaderClient.getContainerEntity(containerId, "ALL", null))
|
||||
.thenReturn(createContainerEntity(containerId));
|
||||
ContainerReport report = client.getContainerReport(containerId);
|
||||
Assert.assertEquals(report.getContainerId(), containerId);
|
||||
Assert.assertEquals(report.getAssignedNode().getHost(), "test host");
|
||||
Assert.assertEquals(report.getAssignedNode().getPort(), 100);
|
||||
Assert.assertEquals(report.getAllocatedResource().getVirtualCores(), 8);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAppAttemptReport() throws IOException, YarnException {
|
||||
final ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||
final ApplicationAttemptId appAttemptId =
|
||||
ApplicationAttemptId.newInstance(appId, 1);
|
||||
when(spyTimelineReaderClient.getApplicationAttemptEntity(appAttemptId,
|
||||
"ALL", null))
|
||||
.thenReturn(createAppAttemptTimelineEntity(appAttemptId));
|
||||
ApplicationAttemptReport report =
|
||||
client.getApplicationAttemptReport(appAttemptId);
|
||||
Assert.assertEquals(report.getApplicationAttemptId(), appAttemptId);
|
||||
Assert.assertEquals(report.getFinishTime(), Integer.MAX_VALUE + 2L);
|
||||
Assert.assertEquals(report.getOriginalTrackingUrl(),
|
||||
"test original tracking url");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAppReport() throws IOException, YarnException {
|
||||
final ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||
when(spyTimelineReaderClient.getApplicationEntity(appId, "ALL", null))
|
||||
.thenReturn(createApplicationTimelineEntity(appId, false, false));
|
||||
ApplicationReport report = client.getApplicationReport(appId);
|
||||
Assert.assertEquals(report.getApplicationId(), appId);
|
||||
Assert.assertEquals(report.getAppNodeLabelExpression(), "test_node_label");
|
||||
Assert.assertTrue(report.getApplicationTags().contains("Test_APP_TAGS_1"));
|
||||
Assert.assertEquals(report.getYarnApplicationState(),
|
||||
YarnApplicationState.FINISHED);
|
||||
}
|
||||
|
||||
private static TimelineEntity createApplicationTimelineEntity(
|
||||
ApplicationId appId, boolean emptyACLs,
|
||||
boolean wrongAppId) {
|
||||
TimelineEntity entity = new TimelineEntity();
|
||||
entity.setType(ApplicationMetricsConstants.ENTITY_TYPE);
|
||||
if (wrongAppId) {
|
||||
entity.setId("wrong_app_id");
|
||||
} else {
|
||||
entity.setId(appId.toString());
|
||||
}
|
||||
|
||||
Map<String, Object> entityInfo = new HashMap<String, Object>();
|
||||
entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, "test app");
|
||||
entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO,
|
||||
"test app type");
|
||||
entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, "user1");
|
||||
entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
|
||||
"test queue");
|
||||
entityInfo.put(
|
||||
ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO, "false");
|
||||
entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO,
|
||||
Priority.newInstance(0));
|
||||
entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
|
||||
Integer.MAX_VALUE + 1L);
|
||||
entityInfo.put(ApplicationMetricsConstants.APP_MEM_METRICS, 123);
|
||||
entityInfo.put(ApplicationMetricsConstants.APP_CPU_METRICS, 345);
|
||||
|
||||
entityInfo.put(ApplicationMetricsConstants.APP_MEM_PREEMPT_METRICS, 456);
|
||||
entityInfo.put(ApplicationMetricsConstants.APP_CPU_PREEMPT_METRICS, 789);
|
||||
|
||||
if (emptyACLs) {
|
||||
entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO, "");
|
||||
} else {
|
||||
entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO,
|
||||
"user2");
|
||||
}
|
||||
|
||||
Set<String> appTags = new HashSet<String>();
|
||||
appTags.add("Test_APP_TAGS_1");
|
||||
appTags.add("Test_APP_TAGS_2");
|
||||
entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO, appTags);
|
||||
entity.setInfo(entityInfo);
|
||||
|
||||
Map<String, String> configs = new HashMap<>();
|
||||
configs.put(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION,
|
||||
"test_node_label");
|
||||
entity.setConfigs(configs);
|
||||
|
||||
TimelineEvent tEvent = new TimelineEvent();
|
||||
tEvent.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
|
||||
tEvent.setTimestamp(Integer.MAX_VALUE + 1L + appId.getId());
|
||||
entity.addEvent(tEvent);
|
||||
|
||||
// send a YARN_APPLICATION_STATE_UPDATED event
|
||||
// after YARN_APPLICATION_FINISHED
|
||||
// The final YarnApplicationState should not be changed
|
||||
tEvent = new TimelineEvent();
|
||||
tEvent.setId(
|
||||
ApplicationMetricsConstants.STATE_UPDATED_EVENT_TYPE);
|
||||
tEvent.setTimestamp(Integer.MAX_VALUE + 2L + appId.getId());
|
||||
Map<String, Object> eventInfo = new HashMap<>();
|
||||
eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO,
|
||||
YarnApplicationState.KILLED);
|
||||
tEvent.setInfo(eventInfo);
|
||||
entity.addEvent(tEvent);
|
||||
|
||||
return entity;
|
||||
}
|
||||
|
||||
private static TimelineEntity createAppAttemptTimelineEntity(
|
||||
ApplicationAttemptId appAttemptId) {
|
||||
TimelineEntity entity = new TimelineEntity();
|
||||
entity.setType(AppAttemptMetricsConstants.ENTITY_TYPE);
|
||||
entity.setId(appAttemptId.toString());
|
||||
|
||||
Map<String, Object> entityInfo = new HashMap<String, Object>();
|
||||
entityInfo.put(AppAttemptMetricsConstants.TRACKING_URL_INFO,
|
||||
"test tracking url");
|
||||
entityInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_INFO,
|
||||
"test original tracking url");
|
||||
entityInfo.put(AppAttemptMetricsConstants.HOST_INFO, "test host");
|
||||
entityInfo.put(AppAttemptMetricsConstants.RPC_PORT_INFO, 100);
|
||||
entityInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_INFO,
|
||||
ContainerId.newContainerId(appAttemptId, 1));
|
||||
entity.setInfo(entityInfo);
|
||||
|
||||
TimelineEvent tEvent = new TimelineEvent();
|
||||
tEvent.setId(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
|
||||
tEvent.setTimestamp(Integer.MAX_VALUE + 1L);
|
||||
entity.addEvent(tEvent);
|
||||
|
||||
tEvent = new TimelineEvent();
|
||||
tEvent.setId(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE);
|
||||
tEvent.setTimestamp(Integer.MAX_VALUE + 2L);
|
||||
entity.addEvent(tEvent);
|
||||
|
||||
return entity;
|
||||
}
|
||||
|
||||
private static TimelineEntity createContainerEntity(ContainerId containerId) {
|
||||
TimelineEntity entity = new TimelineEntity();
|
||||
entity.setType(ContainerMetricsConstants.ENTITY_TYPE);
|
||||
entity.setId(containerId.toString());
|
||||
Map<String, Object> entityInfo = new HashMap<String, Object>();
|
||||
entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_INFO, 1024);
|
||||
entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_INFO, 8);
|
||||
entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_INFO,
|
||||
"test host");
|
||||
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_INFO, 100);
|
||||
entityInfo
|
||||
.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_INFO, -1);
|
||||
entityInfo.put(ContainerMetricsConstants
|
||||
.ALLOCATED_HOST_HTTP_ADDRESS_INFO, "http://test:1234");
|
||||
entityInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO,
|
||||
"test diagnostics info");
|
||||
entityInfo.put(ContainerMetricsConstants.EXIT_STATUS_INFO, -1);
|
||||
entityInfo.put(ContainerMetricsConstants.STATE_INFO,
|
||||
ContainerState.COMPLETE.toString());
|
||||
entity.setInfo(entityInfo);
|
||||
|
||||
TimelineEvent tEvent = new TimelineEvent();
|
||||
tEvent.setId(ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE);
|
||||
tEvent.setTimestamp(123456);
|
||||
entity.addEvent(tEvent);
|
||||
|
||||
return entity;
|
||||
}
|
||||
}
|
|
@ -246,6 +246,9 @@ public class TestLogsCLI {
|
|||
pw.println(" --client_max_retries to");
|
||||
pw.println(" create a retry client. The");
|
||||
pw.println(" default value is 1000.");
|
||||
pw.println(" -clusterId <Cluster ID> ClusterId. By default, it");
|
||||
pw.println(" will take default cluster id");
|
||||
pw.println(" from the RM");
|
||||
pw.println(" -containerId <Container ID> ContainerId. By default, it");
|
||||
pw.println(" will print all available");
|
||||
pw.println(" logs. Work with -log_files");
|
||||
|
|
|
@ -0,0 +1,22 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
/** Yarn Common Metrics package. **/
|
||||
@InterfaceAudience.Private
|
||||
package org.apache.hadoop.yarn.server.metrics;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
|
@ -0,0 +1,449 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.util.timeline;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerReport;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
||||
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
|
||||
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
|
||||
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Utility class to generate reports from timeline entities.
|
||||
*/
|
||||
public final class TimelineEntityV2Converter {
|
||||
private TimelineEntityV2Converter() {
|
||||
}
|
||||
|
||||
public static ContainerReport convertToContainerReport(
|
||||
TimelineEntity entity) {
|
||||
int allocatedMem = 0;
|
||||
int allocatedVcore = 0;
|
||||
String allocatedHost = null;
|
||||
int allocatedPort = -1;
|
||||
int allocatedPriority = 0;
|
||||
long createdTime = 0;
|
||||
long finishedTime = 0;
|
||||
String diagnosticsInfo = null;
|
||||
int exitStatus = ContainerExitStatus.INVALID;
|
||||
ContainerState state = null;
|
||||
String nodeHttpAddress = null;
|
||||
Map<String, Object> entityInfo = entity.getInfo();
|
||||
if (entityInfo != null) {
|
||||
if (entityInfo
|
||||
.containsKey(ContainerMetricsConstants.ALLOCATED_MEMORY_INFO)) {
|
||||
allocatedMem = (Integer) entityInfo.get(
|
||||
ContainerMetricsConstants.ALLOCATED_MEMORY_INFO);
|
||||
}
|
||||
if (entityInfo
|
||||
.containsKey(ContainerMetricsConstants.ALLOCATED_VCORE_INFO)) {
|
||||
allocatedVcore = (Integer) entityInfo.get(
|
||||
ContainerMetricsConstants.ALLOCATED_VCORE_INFO);
|
||||
}
|
||||
if (entityInfo
|
||||
.containsKey(ContainerMetricsConstants.ALLOCATED_HOST_INFO)) {
|
||||
allocatedHost =
|
||||
entityInfo
|
||||
.get(ContainerMetricsConstants.ALLOCATED_HOST_INFO)
|
||||
.toString();
|
||||
}
|
||||
if (entityInfo
|
||||
.containsKey(ContainerMetricsConstants.ALLOCATED_PORT_INFO)) {
|
||||
allocatedPort = (Integer) entityInfo.get(
|
||||
ContainerMetricsConstants.ALLOCATED_PORT_INFO);
|
||||
}
|
||||
if (entityInfo
|
||||
.containsKey(ContainerMetricsConstants.ALLOCATED_PRIORITY_INFO)) {
|
||||
allocatedPriority = Integer.parseInt(entityInfo.get(
|
||||
ContainerMetricsConstants.ALLOCATED_PRIORITY_INFO).toString());
|
||||
}
|
||||
if (entityInfo.containsKey(
|
||||
ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO)) {
|
||||
nodeHttpAddress =
|
||||
(String) entityInfo.get(
|
||||
ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO);
|
||||
}
|
||||
if (entityInfo.containsKey(ContainerMetricsConstants.DIAGNOSTICS_INFO)) {
|
||||
diagnosticsInfo =
|
||||
entityInfo.get(
|
||||
ContainerMetricsConstants.DIAGNOSTICS_INFO)
|
||||
.toString();
|
||||
}
|
||||
if (entityInfo.containsKey(ContainerMetricsConstants.EXIT_STATUS_INFO)) {
|
||||
exitStatus = (Integer) entityInfo.get(
|
||||
ContainerMetricsConstants.EXIT_STATUS_INFO);
|
||||
}
|
||||
if (entityInfo.containsKey(ContainerMetricsConstants.STATE_INFO)) {
|
||||
state =
|
||||
ContainerState.valueOf(entityInfo.get(
|
||||
ContainerMetricsConstants.STATE_INFO).toString());
|
||||
}
|
||||
}
|
||||
NavigableSet<TimelineEvent> events = entity.getEvents();
|
||||
if (events != null) {
|
||||
for (TimelineEvent event : events) {
|
||||
if (event.getId().equals(
|
||||
ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE)) {
|
||||
createdTime = event.getTimestamp();
|
||||
} else if (event.getId().equals(
|
||||
ContainerMetricsConstants.FINISHED_IN_RM_EVENT_TYPE)) {
|
||||
finishedTime = event.getTimestamp();
|
||||
}
|
||||
}
|
||||
}
|
||||
String logUrl = null;
|
||||
NodeId allocatedNode = null;
|
||||
if (allocatedHost != null) {
|
||||
allocatedNode = NodeId.newInstance(allocatedHost, allocatedPort);
|
||||
}
|
||||
return ContainerReport.newInstance(
|
||||
ContainerId.fromString(entity.getId()),
|
||||
Resource.newInstance(allocatedMem, allocatedVcore), allocatedNode,
|
||||
Priority.newInstance(allocatedPriority),
|
||||
createdTime, finishedTime, diagnosticsInfo, logUrl, exitStatus, state,
|
||||
nodeHttpAddress);
|
||||
}
|
||||
|
||||
public static ApplicationAttemptReport convertToApplicationAttemptReport(
|
||||
TimelineEntity entity) {
|
||||
String host = null;
|
||||
int rpcPort = -1;
|
||||
ContainerId amContainerId = null;
|
||||
String trackingUrl = null;
|
||||
String originalTrackingUrl = null;
|
||||
String diagnosticsInfo = null;
|
||||
YarnApplicationAttemptState state = null;
|
||||
Map<String, Object> entityInfo = entity.getInfo();
|
||||
long startTime = 0;
|
||||
long finishTime = 0;
|
||||
|
||||
if (entityInfo != null) {
|
||||
if (entityInfo.containsKey(AppAttemptMetricsConstants.HOST_INFO)) {
|
||||
host =
|
||||
entityInfo.get(AppAttemptMetricsConstants.HOST_INFO)
|
||||
.toString();
|
||||
}
|
||||
if (entityInfo
|
||||
.containsKey(AppAttemptMetricsConstants.RPC_PORT_INFO)) {
|
||||
rpcPort = (Integer) entityInfo.get(
|
||||
AppAttemptMetricsConstants.RPC_PORT_INFO);
|
||||
}
|
||||
if (entityInfo
|
||||
.containsKey(AppAttemptMetricsConstants.MASTER_CONTAINER_INFO)) {
|
||||
amContainerId =
|
||||
ContainerId.fromString(entityInfo.get(
|
||||
AppAttemptMetricsConstants.MASTER_CONTAINER_INFO)
|
||||
.toString());
|
||||
}
|
||||
if (entityInfo
|
||||
.containsKey(AppAttemptMetricsConstants.TRACKING_URL_INFO)) {
|
||||
trackingUrl =
|
||||
entityInfo.get(
|
||||
AppAttemptMetricsConstants.TRACKING_URL_INFO)
|
||||
.toString();
|
||||
}
|
||||
if (entityInfo
|
||||
.containsKey(
|
||||
AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_INFO)) {
|
||||
originalTrackingUrl =
|
||||
entityInfo
|
||||
.get(
|
||||
AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_INFO)
|
||||
.toString();
|
||||
}
|
||||
if (entityInfo
|
||||
.containsKey(AppAttemptMetricsConstants.DIAGNOSTICS_INFO)) {
|
||||
diagnosticsInfo =
|
||||
entityInfo.get(
|
||||
AppAttemptMetricsConstants.DIAGNOSTICS_INFO)
|
||||
.toString();
|
||||
}
|
||||
if (entityInfo
|
||||
.containsKey(AppAttemptMetricsConstants.STATE_INFO)) {
|
||||
state =
|
||||
YarnApplicationAttemptState.valueOf(entityInfo.get(
|
||||
AppAttemptMetricsConstants.STATE_INFO)
|
||||
.toString());
|
||||
}
|
||||
if (entityInfo
|
||||
.containsKey(AppAttemptMetricsConstants.MASTER_CONTAINER_INFO)) {
|
||||
amContainerId =
|
||||
ContainerId.fromString(entityInfo.get(
|
||||
AppAttemptMetricsConstants.MASTER_CONTAINER_INFO)
|
||||
.toString());
|
||||
}
|
||||
}
|
||||
NavigableSet<TimelineEvent> events = entity.getEvents();
|
||||
if (events != null) {
|
||||
for (TimelineEvent event : events) {
|
||||
if (event.getId().equals(
|
||||
AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE)) {
|
||||
startTime = event.getTimestamp();
|
||||
} else if (event.getId().equals(
|
||||
AppAttemptMetricsConstants.FINISHED_EVENT_TYPE)) {
|
||||
finishTime = event.getTimestamp();
|
||||
}
|
||||
}
|
||||
}
|
||||
return ApplicationAttemptReport.newInstance(
|
||||
ApplicationAttemptId.fromString(entity.getId()),
|
||||
host, rpcPort, trackingUrl, originalTrackingUrl, diagnosticsInfo,
|
||||
state, amContainerId, startTime, finishTime);
|
||||
}
|
||||
|
||||
public static ApplicationReport convertToApplicationReport(
|
||||
TimelineEntity entity) {
|
||||
String user = null;
|
||||
String queue = null;
|
||||
String name = null;
|
||||
String type = null;
|
||||
boolean unmanagedApplication = false;
|
||||
long createdTime = 0;
|
||||
long finishedTime = 0;
|
||||
float progress = 0.0f;
|
||||
int applicationPriority = 0;
|
||||
ApplicationAttemptId latestApplicationAttemptId = null;
|
||||
String diagnosticsInfo = null;
|
||||
FinalApplicationStatus finalStatus = FinalApplicationStatus.UNDEFINED;
|
||||
YarnApplicationState state = YarnApplicationState.ACCEPTED;
|
||||
ApplicationResourceUsageReport appResources = null;
|
||||
Set<String> appTags = null;
|
||||
String appNodeLabelExpression = null;
|
||||
String amNodeLabelExpression = null;
|
||||
Map<String, Object> entityInfo = entity.getInfo();
|
||||
if (entityInfo != null) {
|
||||
if (entityInfo.containsKey(
|
||||
ApplicationMetricsConstants.USER_ENTITY_INFO)) {
|
||||
user =
|
||||
entityInfo.get(ApplicationMetricsConstants.USER_ENTITY_INFO)
|
||||
.toString();
|
||||
}
|
||||
if (entityInfo.containsKey(
|
||||
ApplicationMetricsConstants.QUEUE_ENTITY_INFO)) {
|
||||
queue =
|
||||
entityInfo.get(ApplicationMetricsConstants.QUEUE_ENTITY_INFO)
|
||||
.toString();
|
||||
}
|
||||
if (entityInfo.containsKey(
|
||||
ApplicationMetricsConstants.NAME_ENTITY_INFO)) {
|
||||
name =
|
||||
entityInfo.get(ApplicationMetricsConstants.NAME_ENTITY_INFO)
|
||||
.toString();
|
||||
}
|
||||
if (entityInfo.containsKey(
|
||||
ApplicationMetricsConstants.TYPE_ENTITY_INFO)) {
|
||||
type =
|
||||
entityInfo.get(ApplicationMetricsConstants.TYPE_ENTITY_INFO)
|
||||
.toString();
|
||||
}
|
||||
if (entityInfo.containsKey(
|
||||
ApplicationMetricsConstants.TYPE_ENTITY_INFO)) {
|
||||
type =
|
||||
entityInfo.get(ApplicationMetricsConstants.TYPE_ENTITY_INFO)
|
||||
.toString();
|
||||
}
|
||||
if (entityInfo
|
||||
.containsKey(
|
||||
ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO)) {
|
||||
unmanagedApplication =
|
||||
Boolean.parseBoolean(entityInfo.get(
|
||||
ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO)
|
||||
.toString());
|
||||
}
|
||||
if (entityInfo
|
||||
.containsKey(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO)) {
|
||||
applicationPriority = Integer.parseInt(entityInfo.get(
|
||||
ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO).toString());
|
||||
}
|
||||
if (entityInfo.containsKey(ApplicationMetricsConstants.APP_TAGS_INFO)) {
|
||||
appTags = new HashSet<>();
|
||||
Object obj = entityInfo.get(ApplicationMetricsConstants.APP_TAGS_INFO);
|
||||
if (obj != null && obj instanceof Collection<?>) {
|
||||
for(Object o : (Collection<?>)obj) {
|
||||
if (o != null) {
|
||||
appTags.add(o.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (entityInfo
|
||||
.containsKey(
|
||||
ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO)) {
|
||||
latestApplicationAttemptId = ApplicationAttemptId.fromString(
|
||||
entityInfo.get(
|
||||
ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO)
|
||||
.toString());
|
||||
}
|
||||
if (entityInfo.containsKey(
|
||||
ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO)) {
|
||||
diagnosticsInfo =
|
||||
entityInfo.get(
|
||||
ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO)
|
||||
.toString();
|
||||
}
|
||||
if (entityInfo
|
||||
.containsKey(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO)) {
|
||||
finalStatus =
|
||||
FinalApplicationStatus.valueOf(entityInfo.get(
|
||||
ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO)
|
||||
.toString());
|
||||
}
|
||||
if (entityInfo
|
||||
.containsKey(ApplicationMetricsConstants.STATE_EVENT_INFO)) {
|
||||
state =
|
||||
YarnApplicationState.valueOf(entityInfo.get(
|
||||
ApplicationMetricsConstants.STATE_EVENT_INFO).toString());
|
||||
}
|
||||
}
|
||||
|
||||
Map<String, String> configs = entity.getConfigs();
|
||||
if (configs
|
||||
.containsKey(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION)) {
|
||||
appNodeLabelExpression = configs
|
||||
.get(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION);
|
||||
}
|
||||
if (configs
|
||||
.containsKey(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION)) {
|
||||
amNodeLabelExpression =
|
||||
configs.get(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION);
|
||||
}
|
||||
|
||||
Set<TimelineMetric> metrics = entity.getMetrics();
|
||||
if (metrics != null) {
|
||||
long vcoreSeconds = 0;
|
||||
long memorySeconds = 0;
|
||||
long preemptedVcoreSeconds = 0;
|
||||
long preemptedMemorySeconds = 0;
|
||||
|
||||
for (TimelineMetric metric : metrics) {
|
||||
switch (metric.getId()) {
|
||||
case ApplicationMetricsConstants.APP_CPU_METRICS:
|
||||
vcoreSeconds = getAverageValue(metric.getValues().values());
|
||||
break;
|
||||
case ApplicationMetricsConstants.APP_MEM_METRICS:
|
||||
memorySeconds = getAverageValue(metric.getValues().values());
|
||||
break;
|
||||
case ApplicationMetricsConstants.APP_MEM_PREEMPT_METRICS:
|
||||
preemptedVcoreSeconds = getAverageValue(metric.getValues().values());
|
||||
break;
|
||||
case ApplicationMetricsConstants.APP_CPU_PREEMPT_METRICS:
|
||||
preemptedVcoreSeconds = getAverageValue(metric.getValues().values());
|
||||
break;
|
||||
default:
|
||||
// Should not happen..
|
||||
break;
|
||||
}
|
||||
}
|
||||
Map<String, Long> resourceSecondsMap = new HashMap<>();
|
||||
Map<String, Long> preemptedResoureSecondsMap = new HashMap<>();
|
||||
resourceSecondsMap
|
||||
.put(ResourceInformation.MEMORY_MB.getName(), memorySeconds);
|
||||
resourceSecondsMap
|
||||
.put(ResourceInformation.VCORES.getName(), vcoreSeconds);
|
||||
preemptedResoureSecondsMap.put(ResourceInformation.MEMORY_MB.getName(),
|
||||
preemptedMemorySeconds);
|
||||
preemptedResoureSecondsMap
|
||||
.put(ResourceInformation.VCORES.getName(), preemptedVcoreSeconds);
|
||||
|
||||
appResources = ApplicationResourceUsageReport
|
||||
.newInstance(0, 0, null, null, null, resourceSecondsMap, 0, 0,
|
||||
preemptedResoureSecondsMap);
|
||||
}
|
||||
|
||||
NavigableSet<TimelineEvent> events = entity.getEvents();
|
||||
long updatedTimeStamp = 0L;
|
||||
if (events != null) {
|
||||
for (TimelineEvent event : events) {
|
||||
if (event.getId().equals(
|
||||
ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
|
||||
createdTime = event.getTimestamp();
|
||||
} else if (event.getId().equals(
|
||||
ApplicationMetricsConstants.UPDATED_EVENT_TYPE)) {
|
||||
// This type of events are parsed in time-stamp descending order
|
||||
// which means the previous event could override the information
|
||||
// from the later same type of event. Hence compare timestamp
|
||||
// before over writing.
|
||||
if (event.getTimestamp() > updatedTimeStamp) {
|
||||
updatedTimeStamp = event.getTimestamp();
|
||||
}
|
||||
} else if (event.getId().equals(
|
||||
ApplicationMetricsConstants.STATE_UPDATED_EVENT_TYPE)) {
|
||||
Map<String, Object> eventInfo = event.getInfo();
|
||||
if (eventInfo == null) {
|
||||
continue;
|
||||
}
|
||||
if (eventInfo.containsKey(
|
||||
ApplicationMetricsConstants.STATE_EVENT_INFO)) {
|
||||
if (state == YarnApplicationState.ACCEPTED) {
|
||||
state = YarnApplicationState.valueOf(eventInfo.get(
|
||||
ApplicationMetricsConstants.STATE_EVENT_INFO).toString());
|
||||
}
|
||||
}
|
||||
} else if (event.getId().equals(
|
||||
ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
|
||||
progress=1.0F;
|
||||
state = YarnApplicationState.FINISHED;
|
||||
finishedTime = event.getTimestamp();
|
||||
}
|
||||
}
|
||||
}
|
||||
return ApplicationReport.newInstance(
|
||||
ApplicationId.fromString(entity.getId()),
|
||||
latestApplicationAttemptId, user, queue, name, null, -1, null, state,
|
||||
diagnosticsInfo, null, createdTime, finishedTime, finalStatus,
|
||||
appResources, null, progress, type, null, appTags, unmanagedApplication,
|
||||
Priority.newInstance(applicationPriority), appNodeLabelExpression,
|
||||
amNodeLabelExpression);
|
||||
}
|
||||
|
||||
private static long getAverageValue(Collection<Number> values) {
|
||||
if (values == null || values.isEmpty()) {
|
||||
return 0;
|
||||
}
|
||||
long sum = 0;
|
||||
for (Number value : values) {
|
||||
sum += value.longValue();
|
||||
}
|
||||
return sum/values.size();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue