YARN-2808. Made YARN CLI list attempt’s finished containers of a running application. Contributed by Naganarasimha G R.

(cherry picked from commit c0ec4f4cc12208342d45236b8841887a20f7f88a)
This commit is contained in:
Zhijie Shen 2015-02-02 12:03:20 -08:00
parent dedfb982f1
commit b51c0288de
3 changed files with 124 additions and 14 deletions

View File

@ -188,6 +188,9 @@ Release 2.7.0 - UNRELEASED
YARN-3108. ApplicationHistoryServer doesn't process -D arguments (Chang Li
via jeagles)
YARN-2808. Made YARN CLI list attempts finished containers of a running
application. (Naganarasimha G R via zjshen)
OPTIMIZATIONS
BUG FIXES

View File

@ -22,6 +22,8 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -118,7 +120,7 @@ public class YarnClientImpl extends YarnClient {
protected long submitPollIntervalMillis;
private long asyncApiPollIntervalMillis;
private long asyncApiPollTimeoutMillis;
private AHSClient historyClient;
protected AHSClient historyClient;
private boolean historyServiceEnabled;
protected TimelineClient timelineClient;
@VisibleForTesting
@ -647,24 +649,79 @@ public ContainerReport getContainerReport(ContainerId containerId)
public List<ContainerReport> getContainers(
ApplicationAttemptId applicationAttemptId) throws YarnException,
IOException {
List<ContainerReport> containersForAttempt =
new ArrayList<ContainerReport>();
boolean appNotFoundInRM = false;
try {
GetContainersRequest request = Records
.newRecord(GetContainersRequest.class);
GetContainersRequest request =
Records.newRecord(GetContainersRequest.class);
request.setApplicationAttemptId(applicationAttemptId);
GetContainersResponse response = rmClient.getContainers(request);
return response.getContainerList();
containersForAttempt.addAll(response.getContainerList());
} catch (YarnException e) {
if (!historyServiceEnabled) {
// Just throw it as usual if historyService is not enabled.
if (e.getClass() != ApplicationNotFoundException.class
|| !historyServiceEnabled) {
// If Application is not in RM and history service is enabled then we
// need to check with history service else throw exception.
throw e;
}
// Even if history-service is enabled, treat all exceptions still the same
// except the following
if (e.getClass() != ApplicationNotFoundException.class) {
throw e;
}
return historyClient.getContainers(applicationAttemptId);
appNotFoundInRM = true;
}
if (historyServiceEnabled) {
// Check with AHS even if found in RM because to capture info of finished
// containers also
List<ContainerReport> containersListFromAHS = null;
try {
containersListFromAHS =
historyClient.getContainers(applicationAttemptId);
} catch (IOException e) {
// History service access might be enabled but system metrics publisher
// is disabled hence app not found exception is possible
if (appNotFoundInRM) {
// app not found in bothM and RM then propagate the exception.
throw e;
}
}
if (null != containersListFromAHS && containersListFromAHS.size() > 0) {
// remove duplicates
Set<ContainerId> containerIdsToBeKeptFromAHS =
new HashSet<ContainerId>();
Iterator<ContainerReport> tmpItr = containersListFromAHS.iterator();
while (tmpItr.hasNext()) {
containerIdsToBeKeptFromAHS.add(tmpItr.next().getContainerId());
}
Iterator<ContainerReport> rmContainers =
containersForAttempt.iterator();
while (rmContainers.hasNext()) {
ContainerReport tmp = rmContainers.next();
containerIdsToBeKeptFromAHS.remove(tmp.getContainerId());
// Remove containers from AHS as container from RM will have latest
// information
}
if (containerIdsToBeKeptFromAHS.size() > 0
&& containersListFromAHS.size() != containerIdsToBeKeptFromAHS
.size()) {
Iterator<ContainerReport> containersFromHS =
containersListFromAHS.iterator();
while (containersFromHS.hasNext()) {
ContainerReport containerReport = containersFromHS.next();
if (containerIdsToBeKeptFromAHS.contains(containerReport
.getContainerId())) {
containersForAttempt.add(containerReport);
}
}
} else if (containersListFromAHS.size() == containerIdsToBeKeptFromAHS
.size()) {
containersForAttempt.addAll(containersListFromAHS);
}
}
}
return containersForAttempt;
}
@Override

View File

@ -91,6 +91,7 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.AHSClient;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
@ -338,6 +339,9 @@ public void testGetApplicationAttempt() throws YarnException, IOException {
@Test(timeout = 10000)
public void testGetContainers() throws YarnException, IOException {
Configuration conf = new Configuration();
conf.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
true);
final YarnClient client = new MockYarnClient();
client.init(conf);
client.start();
@ -351,6 +355,17 @@ public void testGetContainers() throws YarnException, IOException {
(ContainerId.newContainerId(appAttemptId, 1)));
Assert.assertEquals(reports.get(1).getContainerId(),
(ContainerId.newContainerId(appAttemptId, 2)));
Assert.assertEquals(reports.get(2).getContainerId(),
(ContainerId.newContainerId(appAttemptId, 3)));
//First2 containers should come from RM with updated state information and
// 3rd container is not there in RM and should
Assert.assertEquals(ContainerState.RUNNING,
(reports.get(0).getContainerState()));
Assert.assertEquals(ContainerState.RUNNING,
(reports.get(1).getContainerState()));
Assert.assertEquals(ContainerState.COMPLETE,
(reports.get(2).getContainerState()));
client.stop();
}
@ -383,6 +398,9 @@ private static class MockYarnClient extends YarnClientImpl {
new HashMap<ApplicationId, List<ApplicationAttemptReport>>();
private HashMap<ApplicationAttemptId, List<ContainerReport>> containers =
new HashMap<ApplicationAttemptId, List<ContainerReport>>();
private HashMap<ApplicationAttemptId, List<ContainerReport>> containersFromAHS =
new HashMap<ApplicationAttemptId, List<ContainerReport>>();
GetApplicationsResponse mockAppResponse =
mock(GetApplicationsResponse.class);
GetApplicationAttemptsResponse mockAppAttemptsResponse =
@ -428,6 +446,9 @@ public void start() {
when(rmClient.getContainerReport(any(GetContainerReportRequest.class)))
.thenReturn(mockContainerResponse);
historyClient = mock(AHSClient.class);
} catch (YarnException e) {
Assert.fail("Exception is not expected.");
} catch (IOException e) {
@ -501,15 +522,37 @@ private List<ApplicationReport> createAppReports() {
ContainerReport container = ContainerReport.newInstance(
ContainerId.newContainerId(attempt.getApplicationAttemptId(), 1), null,
NodeId.newInstance("host", 1234), Priority.UNDEFINED, 1234, 5678,
"diagnosticInfo", "logURL", 0, ContainerState.COMPLETE);
"diagnosticInfo", "logURL", 0, ContainerState.RUNNING);
containerReports.add(container);
ContainerReport container1 = ContainerReport.newInstance(
ContainerId.newContainerId(attempt.getApplicationAttemptId(), 2), null,
NodeId.newInstance("host", 1234), Priority.UNDEFINED, 1234, 5678,
"diagnosticInfo", "logURL", 0, ContainerState.COMPLETE);
"diagnosticInfo", "logURL", 0, ContainerState.RUNNING);
containerReports.add(container1);
containers.put(attempt.getApplicationAttemptId(), containerReports);
//add containers to be sent from AHS
List<ContainerReport> containerReportsForAHS =
new ArrayList<ContainerReport>();
container = ContainerReport.newInstance(
ContainerId.newContainerId(attempt.getApplicationAttemptId(), 1), null,
NodeId.newInstance("host", 1234), Priority.UNDEFINED, 1234, 5678,
"diagnosticInfo", "logURL", 0, null);
containerReportsForAHS.add(container);
container1 = ContainerReport.newInstance(
ContainerId.newContainerId(attempt.getApplicationAttemptId(), 2), null,
NodeId.newInstance("host", 1234), Priority.UNDEFINED, 1234, 5678,
"diagnosticInfo", "HSlogURL", 0, null);
containerReportsForAHS.add(container1);
ContainerReport container2 = ContainerReport.newInstance(
ContainerId.newContainerId(attempt.getApplicationAttemptId(),3), null,
NodeId.newInstance("host", 1234), Priority.UNDEFINED, 1234, 5678,
"diagnosticInfo", "HSlogURL", 0, ContainerState.COMPLETE);
containerReportsForAHS.add(container2);
containersFromAHS.put(attempt.getApplicationAttemptId(), containerReportsForAHS);
ApplicationId applicationId2 = ApplicationId.newInstance(1234, 6);
ApplicationReport newApplicationReport2 = ApplicationReport.newInstance(
@ -586,9 +629,16 @@ public ApplicationAttemptReport getApplicationAttemptReport(
IOException {
when(mockContainersResponse.getContainerList()).thenReturn(
getContainersReport(appAttemptId));
when(historyClient.getContainers(any(ApplicationAttemptId.class)))
.thenReturn(getContainersFromAHS(appAttemptId));
return super.getContainers(appAttemptId);
}
private List<ContainerReport> getContainersFromAHS(
ApplicationAttemptId appAttemptId) {
return containersFromAHS.get(appAttemptId);
}
@Override
public ContainerReport getContainerReport(ContainerId containerId)
throws YarnException, IOException {