diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 8a05b5cbd1c..bc20e884154 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -225,6 +225,9 @@ Release 2.7.0 - UNRELEASED YARN-3108. ApplicationHistoryServer doesn't process -D arguments (Chang Li via jeagles) + YARN-2808. Made YARN CLI list attempt’s finished containers of a running + application. (Naganarasimha G R via zjshen) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index e4f31f20d84..99c0f0290d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.EnumSet; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -118,7 +120,7 @@ public class YarnClientImpl extends YarnClient { protected long submitPollIntervalMillis; private long asyncApiPollIntervalMillis; private long asyncApiPollTimeoutMillis; - private AHSClient historyClient; + protected AHSClient historyClient; private boolean historyServiceEnabled; protected TimelineClient timelineClient; @VisibleForTesting @@ -647,24 +649,79 @@ public class YarnClientImpl extends YarnClient { public List getContainers( ApplicationAttemptId applicationAttemptId) throws YarnException, IOException { + List containersForAttempt = + new ArrayList(); + boolean appNotFoundInRM = false; try { - GetContainersRequest request = Records - .newRecord(GetContainersRequest.class); + GetContainersRequest request = + Records.newRecord(GetContainersRequest.class); request.setApplicationAttemptId(applicationAttemptId); GetContainersResponse response = rmClient.getContainers(request); - return response.getContainerList(); + containersForAttempt.addAll(response.getContainerList()); } catch (YarnException e) { - if (!historyServiceEnabled) { - // Just throw it as usual if historyService is not enabled. + if (e.getClass() != ApplicationNotFoundException.class + || !historyServiceEnabled) { + // If Application is not in RM and history service is enabled then we + // need to check with history service else throw exception. throw e; } - // Even if history-service is enabled, treat all exceptions still the same - // except the following - if (e.getClass() != ApplicationNotFoundException.class) { - throw e; - } - return historyClient.getContainers(applicationAttemptId); + appNotFoundInRM = true; } + + if (historyServiceEnabled) { + // Check with AHS even if found in RM because to capture info of finished + // containers also + List containersListFromAHS = null; + try { + containersListFromAHS = + historyClient.getContainers(applicationAttemptId); + } catch (IOException e) { + // History service access might be enabled but system metrics publisher + // is disabled hence app not found exception is possible + if (appNotFoundInRM) { + // app not found in bothM and RM then propagate the exception. + throw e; + } + } + + if (null != containersListFromAHS && containersListFromAHS.size() > 0) { + // remove duplicates + + Set containerIdsToBeKeptFromAHS = + new HashSet(); + Iterator tmpItr = containersListFromAHS.iterator(); + while (tmpItr.hasNext()) { + containerIdsToBeKeptFromAHS.add(tmpItr.next().getContainerId()); + } + + Iterator rmContainers = + containersForAttempt.iterator(); + while (rmContainers.hasNext()) { + ContainerReport tmp = rmContainers.next(); + containerIdsToBeKeptFromAHS.remove(tmp.getContainerId()); + // Remove containers from AHS as container from RM will have latest + // information + } + + if (containerIdsToBeKeptFromAHS.size() > 0 + && containersListFromAHS.size() != containerIdsToBeKeptFromAHS + .size()) { + Iterator containersFromHS = + containersListFromAHS.iterator(); + while (containersFromHS.hasNext()) { + ContainerReport containerReport = containersFromHS.next(); + if (containerIdsToBeKeptFromAHS.contains(containerReport + .getContainerId())) { + containersForAttempt.add(containerReport); + } + } + } else if (containersListFromAHS.size() == containerIdsToBeKeptFromAHS + .size()) { + containersForAttempt.addAll(containersListFromAHS); + } + } + } + return containersForAttempt; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java index 02f28821555..785968871ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java @@ -91,6 +91,7 @@ import org.apache.hadoop.yarn.api.records.ReservationRequests; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.AHSClient; import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClientApplication; @@ -338,6 +339,9 @@ public class TestYarnClient { @Test(timeout = 10000) public void testGetContainers() throws YarnException, IOException { Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, + true); + final YarnClient client = new MockYarnClient(); client.init(conf); client.start(); @@ -351,6 +355,17 @@ public class TestYarnClient { (ContainerId.newContainerId(appAttemptId, 1))); Assert.assertEquals(reports.get(1).getContainerId(), (ContainerId.newContainerId(appAttemptId, 2))); + Assert.assertEquals(reports.get(2).getContainerId(), + (ContainerId.newContainerId(appAttemptId, 3))); + + //First2 containers should come from RM with updated state information and + // 3rd container is not there in RM and should + Assert.assertEquals(ContainerState.RUNNING, + (reports.get(0).getContainerState())); + Assert.assertEquals(ContainerState.RUNNING, + (reports.get(1).getContainerState())); + Assert.assertEquals(ContainerState.COMPLETE, + (reports.get(2).getContainerState())); client.stop(); } @@ -383,6 +398,9 @@ public class TestYarnClient { new HashMap>(); private HashMap> containers = new HashMap>(); + private HashMap> containersFromAHS = + new HashMap>(); + GetApplicationsResponse mockAppResponse = mock(GetApplicationsResponse.class); GetApplicationAttemptsResponse mockAppAttemptsResponse = @@ -428,6 +446,9 @@ public class TestYarnClient { when(rmClient.getContainerReport(any(GetContainerReportRequest.class))) .thenReturn(mockContainerResponse); + + historyClient = mock(AHSClient.class); + } catch (YarnException e) { Assert.fail("Exception is not expected."); } catch (IOException e) { @@ -501,15 +522,37 @@ public class TestYarnClient { ContainerReport container = ContainerReport.newInstance( ContainerId.newContainerId(attempt.getApplicationAttemptId(), 1), null, NodeId.newInstance("host", 1234), Priority.UNDEFINED, 1234, 5678, - "diagnosticInfo", "logURL", 0, ContainerState.COMPLETE); + "diagnosticInfo", "logURL", 0, ContainerState.RUNNING); containerReports.add(container); ContainerReport container1 = ContainerReport.newInstance( ContainerId.newContainerId(attempt.getApplicationAttemptId(), 2), null, NodeId.newInstance("host", 1234), Priority.UNDEFINED, 1234, 5678, - "diagnosticInfo", "logURL", 0, ContainerState.COMPLETE); + "diagnosticInfo", "logURL", 0, ContainerState.RUNNING); containerReports.add(container1); containers.put(attempt.getApplicationAttemptId(), containerReports); + + //add containers to be sent from AHS + List containerReportsForAHS = + new ArrayList(); + + container = ContainerReport.newInstance( + ContainerId.newContainerId(attempt.getApplicationAttemptId(), 1), null, + NodeId.newInstance("host", 1234), Priority.UNDEFINED, 1234, 5678, + "diagnosticInfo", "logURL", 0, null); + containerReportsForAHS.add(container); + + container1 = ContainerReport.newInstance( + ContainerId.newContainerId(attempt.getApplicationAttemptId(), 2), null, + NodeId.newInstance("host", 1234), Priority.UNDEFINED, 1234, 5678, + "diagnosticInfo", "HSlogURL", 0, null); + containerReportsForAHS.add(container1); + ContainerReport container2 = ContainerReport.newInstance( + ContainerId.newContainerId(attempt.getApplicationAttemptId(),3), null, + NodeId.newInstance("host", 1234), Priority.UNDEFINED, 1234, 5678, + "diagnosticInfo", "HSlogURL", 0, ContainerState.COMPLETE); + containerReportsForAHS.add(container2); + containersFromAHS.put(attempt.getApplicationAttemptId(), containerReportsForAHS); ApplicationId applicationId2 = ApplicationId.newInstance(1234, 6); ApplicationReport newApplicationReport2 = ApplicationReport.newInstance( @@ -586,9 +629,16 @@ public class TestYarnClient { IOException { when(mockContainersResponse.getContainerList()).thenReturn( getContainersReport(appAttemptId)); + when(historyClient.getContainers(any(ApplicationAttemptId.class))) + .thenReturn(getContainersFromAHS(appAttemptId)); return super.getContainers(appAttemptId); } + private List getContainersFromAHS( + ApplicationAttemptId appAttemptId) { + return containersFromAHS.get(appAttemptId); + } + @Override public ContainerReport getContainerReport(ContainerId containerId) throws YarnException, IOException {