YARN-2808. Made YARN CLI list attempt’s finished containers of a running application. Contributed by Naganarasimha G R.

This commit is contained in:
Zhijie Shen 2015-02-02 12:03:20 -08:00
parent 1c09ca2ba4
commit 52575ff224
3 changed files with 124 additions and 14 deletions

View File

@ -225,6 +225,9 @@ Release 2.7.0 - UNRELEASED
YARN-3108. ApplicationHistoryServer doesn't process -D arguments (Chang Li YARN-3108. ApplicationHistoryServer doesn't process -D arguments (Chang Li
via jeagles) via jeagles)
YARN-2808. Made YARN CLI list attempts finished containers of a running
application. (Naganarasimha G R via zjshen)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -22,6 +22,8 @@ import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -118,7 +120,7 @@ public class YarnClientImpl extends YarnClient {
protected long submitPollIntervalMillis; protected long submitPollIntervalMillis;
private long asyncApiPollIntervalMillis; private long asyncApiPollIntervalMillis;
private long asyncApiPollTimeoutMillis; private long asyncApiPollTimeoutMillis;
private AHSClient historyClient; protected AHSClient historyClient;
private boolean historyServiceEnabled; private boolean historyServiceEnabled;
protected TimelineClient timelineClient; protected TimelineClient timelineClient;
@VisibleForTesting @VisibleForTesting
@ -647,24 +649,79 @@ public class YarnClientImpl extends YarnClient {
public List<ContainerReport> getContainers( public List<ContainerReport> getContainers(
ApplicationAttemptId applicationAttemptId) throws YarnException, ApplicationAttemptId applicationAttemptId) throws YarnException,
IOException { IOException {
List<ContainerReport> containersForAttempt =
new ArrayList<ContainerReport>();
boolean appNotFoundInRM = false;
try { try {
GetContainersRequest request = Records GetContainersRequest request =
.newRecord(GetContainersRequest.class); Records.newRecord(GetContainersRequest.class);
request.setApplicationAttemptId(applicationAttemptId); request.setApplicationAttemptId(applicationAttemptId);
GetContainersResponse response = rmClient.getContainers(request); GetContainersResponse response = rmClient.getContainers(request);
return response.getContainerList(); containersForAttempt.addAll(response.getContainerList());
} catch (YarnException e) { } catch (YarnException e) {
if (!historyServiceEnabled) { if (e.getClass() != ApplicationNotFoundException.class
// Just throw it as usual if historyService is not enabled. || !historyServiceEnabled) {
// If Application is not in RM and history service is enabled then we
// need to check with history service else throw exception.
throw e; throw e;
} }
// Even if history-service is enabled, treat all exceptions still the same appNotFoundInRM = true;
// except the following
if (e.getClass() != ApplicationNotFoundException.class) {
throw e;
}
return historyClient.getContainers(applicationAttemptId);
} }
if (historyServiceEnabled) {
// Check with AHS even if found in RM because to capture info of finished
// containers also
List<ContainerReport> containersListFromAHS = null;
try {
containersListFromAHS =
historyClient.getContainers(applicationAttemptId);
} catch (IOException e) {
// History service access might be enabled but system metrics publisher
// is disabled hence app not found exception is possible
if (appNotFoundInRM) {
// app not found in bothM and RM then propagate the exception.
throw e;
}
}
if (null != containersListFromAHS && containersListFromAHS.size() > 0) {
// remove duplicates
Set<ContainerId> containerIdsToBeKeptFromAHS =
new HashSet<ContainerId>();
Iterator<ContainerReport> tmpItr = containersListFromAHS.iterator();
while (tmpItr.hasNext()) {
containerIdsToBeKeptFromAHS.add(tmpItr.next().getContainerId());
}
Iterator<ContainerReport> rmContainers =
containersForAttempt.iterator();
while (rmContainers.hasNext()) {
ContainerReport tmp = rmContainers.next();
containerIdsToBeKeptFromAHS.remove(tmp.getContainerId());
// Remove containers from AHS as container from RM will have latest
// information
}
if (containerIdsToBeKeptFromAHS.size() > 0
&& containersListFromAHS.size() != containerIdsToBeKeptFromAHS
.size()) {
Iterator<ContainerReport> containersFromHS =
containersListFromAHS.iterator();
while (containersFromHS.hasNext()) {
ContainerReport containerReport = containersFromHS.next();
if (containerIdsToBeKeptFromAHS.contains(containerReport
.getContainerId())) {
containersForAttempt.add(containerReport);
}
}
} else if (containersListFromAHS.size() == containerIdsToBeKeptFromAHS
.size()) {
containersForAttempt.addAll(containersListFromAHS);
}
}
}
return containersForAttempt;
} }
@Override @Override

View File

@ -91,6 +91,7 @@ import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.AHSClient;
import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.client.api.YarnClientApplication;
@ -338,6 +339,9 @@ public class TestYarnClient {
@Test(timeout = 10000) @Test(timeout = 10000)
public void testGetContainers() throws YarnException, IOException { public void testGetContainers() throws YarnException, IOException {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
true);
final YarnClient client = new MockYarnClient(); final YarnClient client = new MockYarnClient();
client.init(conf); client.init(conf);
client.start(); client.start();
@ -351,6 +355,17 @@ public class TestYarnClient {
(ContainerId.newContainerId(appAttemptId, 1))); (ContainerId.newContainerId(appAttemptId, 1)));
Assert.assertEquals(reports.get(1).getContainerId(), Assert.assertEquals(reports.get(1).getContainerId(),
(ContainerId.newContainerId(appAttemptId, 2))); (ContainerId.newContainerId(appAttemptId, 2)));
Assert.assertEquals(reports.get(2).getContainerId(),
(ContainerId.newContainerId(appAttemptId, 3)));
//First2 containers should come from RM with updated state information and
// 3rd container is not there in RM and should
Assert.assertEquals(ContainerState.RUNNING,
(reports.get(0).getContainerState()));
Assert.assertEquals(ContainerState.RUNNING,
(reports.get(1).getContainerState()));
Assert.assertEquals(ContainerState.COMPLETE,
(reports.get(2).getContainerState()));
client.stop(); client.stop();
} }
@ -383,6 +398,9 @@ public class TestYarnClient {
new HashMap<ApplicationId, List<ApplicationAttemptReport>>(); new HashMap<ApplicationId, List<ApplicationAttemptReport>>();
private HashMap<ApplicationAttemptId, List<ContainerReport>> containers = private HashMap<ApplicationAttemptId, List<ContainerReport>> containers =
new HashMap<ApplicationAttemptId, List<ContainerReport>>(); new HashMap<ApplicationAttemptId, List<ContainerReport>>();
private HashMap<ApplicationAttemptId, List<ContainerReport>> containersFromAHS =
new HashMap<ApplicationAttemptId, List<ContainerReport>>();
GetApplicationsResponse mockAppResponse = GetApplicationsResponse mockAppResponse =
mock(GetApplicationsResponse.class); mock(GetApplicationsResponse.class);
GetApplicationAttemptsResponse mockAppAttemptsResponse = GetApplicationAttemptsResponse mockAppAttemptsResponse =
@ -428,6 +446,9 @@ public class TestYarnClient {
when(rmClient.getContainerReport(any(GetContainerReportRequest.class))) when(rmClient.getContainerReport(any(GetContainerReportRequest.class)))
.thenReturn(mockContainerResponse); .thenReturn(mockContainerResponse);
historyClient = mock(AHSClient.class);
} catch (YarnException e) { } catch (YarnException e) {
Assert.fail("Exception is not expected."); Assert.fail("Exception is not expected.");
} catch (IOException e) { } catch (IOException e) {
@ -501,16 +522,38 @@ public class TestYarnClient {
ContainerReport container = ContainerReport.newInstance( ContainerReport container = ContainerReport.newInstance(
ContainerId.newContainerId(attempt.getApplicationAttemptId(), 1), null, ContainerId.newContainerId(attempt.getApplicationAttemptId(), 1), null,
NodeId.newInstance("host", 1234), Priority.UNDEFINED, 1234, 5678, NodeId.newInstance("host", 1234), Priority.UNDEFINED, 1234, 5678,
"diagnosticInfo", "logURL", 0, ContainerState.COMPLETE); "diagnosticInfo", "logURL", 0, ContainerState.RUNNING);
containerReports.add(container); containerReports.add(container);
ContainerReport container1 = ContainerReport.newInstance( ContainerReport container1 = ContainerReport.newInstance(
ContainerId.newContainerId(attempt.getApplicationAttemptId(), 2), null, ContainerId.newContainerId(attempt.getApplicationAttemptId(), 2), null,
NodeId.newInstance("host", 1234), Priority.UNDEFINED, 1234, 5678, NodeId.newInstance("host", 1234), Priority.UNDEFINED, 1234, 5678,
"diagnosticInfo", "logURL", 0, ContainerState.COMPLETE); "diagnosticInfo", "logURL", 0, ContainerState.RUNNING);
containerReports.add(container1); containerReports.add(container1);
containers.put(attempt.getApplicationAttemptId(), containerReports); containers.put(attempt.getApplicationAttemptId(), containerReports);
//add containers to be sent from AHS
List<ContainerReport> containerReportsForAHS =
new ArrayList<ContainerReport>();
container = ContainerReport.newInstance(
ContainerId.newContainerId(attempt.getApplicationAttemptId(), 1), null,
NodeId.newInstance("host", 1234), Priority.UNDEFINED, 1234, 5678,
"diagnosticInfo", "logURL", 0, null);
containerReportsForAHS.add(container);
container1 = ContainerReport.newInstance(
ContainerId.newContainerId(attempt.getApplicationAttemptId(), 2), null,
NodeId.newInstance("host", 1234), Priority.UNDEFINED, 1234, 5678,
"diagnosticInfo", "HSlogURL", 0, null);
containerReportsForAHS.add(container1);
ContainerReport container2 = ContainerReport.newInstance(
ContainerId.newContainerId(attempt.getApplicationAttemptId(),3), null,
NodeId.newInstance("host", 1234), Priority.UNDEFINED, 1234, 5678,
"diagnosticInfo", "HSlogURL", 0, ContainerState.COMPLETE);
containerReportsForAHS.add(container2);
containersFromAHS.put(attempt.getApplicationAttemptId(), containerReportsForAHS);
ApplicationId applicationId2 = ApplicationId.newInstance(1234, 6); ApplicationId applicationId2 = ApplicationId.newInstance(1234, 6);
ApplicationReport newApplicationReport2 = ApplicationReport.newInstance( ApplicationReport newApplicationReport2 = ApplicationReport.newInstance(
applicationId2, ApplicationAttemptId.newInstance(applicationId2, 2), applicationId2, ApplicationAttemptId.newInstance(applicationId2, 2),
@ -586,9 +629,16 @@ public class TestYarnClient {
IOException { IOException {
when(mockContainersResponse.getContainerList()).thenReturn( when(mockContainersResponse.getContainerList()).thenReturn(
getContainersReport(appAttemptId)); getContainersReport(appAttemptId));
when(historyClient.getContainers(any(ApplicationAttemptId.class)))
.thenReturn(getContainersFromAHS(appAttemptId));
return super.getContainers(appAttemptId); return super.getContainers(appAttemptId);
} }
private List<ContainerReport> getContainersFromAHS(
ApplicationAttemptId appAttemptId) {
return containersFromAHS.get(appAttemptId);
}
@Override @Override
public ContainerReport getContainerReport(ContainerId containerId) public ContainerReport getContainerReport(ContainerId containerId)
throws YarnException, IOException { throws YarnException, IOException {