YARN-1389. Made ApplicationClientProtocol and ApplicationHistoryProtocol expose analogous getApplication(s)/Attempt(s)/Container(s) APIs. Contributed by Mayank Bansal.
svn merge --ignore-ancestry -c 1577052 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1577053 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
502a0da830
commit
7834c7f193
|
@ -72,6 +72,10 @@ import org.apache.hadoop.service.AbstractService;
|
|||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||
|
@ -80,6 +84,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
||||
|
@ -363,6 +371,32 @@ public class TestClientRedirect {
|
|||
MoveApplicationAcrossQueuesRequest request) throws YarnException, IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetApplicationAttemptReportResponse getApplicationAttemptReport(
|
||||
GetApplicationAttemptReportRequest request) throws YarnException,
|
||||
IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetApplicationAttemptsResponse getApplicationAttempts(
|
||||
GetApplicationAttemptsRequest request) throws YarnException,
|
||||
IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetContainerReportResponse getContainerReport(
|
||||
GetContainerReportRequest request) throws YarnException, IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetContainersResponse getContainers(GetContainersRequest request)
|
||||
throws YarnException, IOException {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
class HistoryService extends AMService implements HSClientProtocol {
|
||||
|
|
|
@ -269,6 +269,10 @@ Release 2.4.0 - UNRELEASED
|
|||
YARN-1764. Modified YarnClient to correctly handle failover of ResourceManager
|
||||
after the submitApplication call goes through. (Xuan Gong via vinodkv)
|
||||
|
||||
YARN-1389. Made ApplicationClientProtocol and ApplicationHistoryProtocol
|
||||
expose analogous getApplication(s)/Attempt(s)/Container(s) APIs. (Mayank
|
||||
Bansal via zjshen)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -27,6 +27,10 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|||
import org.apache.hadoop.io.retry.Idempotent;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||
|
@ -35,6 +39,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
||||
|
@ -51,9 +59,13 @@ import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerReport;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
|
@ -367,4 +379,148 @@ public interface ApplicationClientProtocol {
|
|||
@Unstable
|
||||
public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
|
||||
MoveApplicationAcrossQueuesRequest request) throws YarnException, IOException;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* The interface used by clients to get a report of an Application Attempt
|
||||
* from the <code>ResourceManager</code>
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* The client, via {@link GetApplicationAttemptReportRequest} provides the
|
||||
* {@link ApplicationAttemptId} of the application attempt.
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* In secure mode,the <code>ResourceManager</code> verifies access to
|
||||
* the method before accepting the request.
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* The <code>ResourceManager</code> responds with a
|
||||
* {@link GetApplicationAttemptReportResponse} which includes the
|
||||
* {@link ApplicationAttemptReport} for the application attempt.
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* If the user does not have <code>VIEW_APP</code> access then the following
|
||||
* fields in the report will be set to stubbed values:
|
||||
* <ul>
|
||||
* <li>host</li>
|
||||
* <li>RPC port</li>
|
||||
* <li>client token</li>
|
||||
* <li>diagnostics - set to "N/A"</li>
|
||||
* <li>tracking URL</li>
|
||||
* </ul>
|
||||
* </p>
|
||||
*
|
||||
* @param request
|
||||
* request for an application attempt report
|
||||
* @return application attempt report
|
||||
* @throws YarnException
|
||||
* @throws IOException
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public GetApplicationAttemptReportResponse getApplicationAttemptReport(
|
||||
GetApplicationAttemptReportRequest request) throws YarnException,
|
||||
IOException;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* The interface used by clients to get a report of all Application attempts
|
||||
* in the cluster from the <code>ResourceManager</code>
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* The <code>ResourceManager</code> responds with a
|
||||
* {@link GetApplicationAttemptsRequest} which includes the
|
||||
* {@link ApplicationAttemptReport} for all the applications attempts of a
|
||||
* specified application attempt.
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* If the user does not have <code>VIEW_APP</code> access for an application
|
||||
* then the corresponding report will be filtered as described in
|
||||
* {@link #getApplicationAttemptReport(GetApplicationAttemptReportRequest)}.
|
||||
* </p>
|
||||
*
|
||||
* @param request
|
||||
* request for reports on all application attempts of an application
|
||||
* @return reports on all application attempts of an application
|
||||
* @throws YarnException
|
||||
* @throws IOException
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public GetApplicationAttemptsResponse getApplicationAttempts(
|
||||
GetApplicationAttemptsRequest request) throws YarnException, IOException;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* The interface used by clients to get a report of an Container from the
|
||||
* <code>ResourceManager</code>
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* The client, via {@link GetContainerReportRequest} provides the
|
||||
* {@link ContainerId} of the container.
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* In secure mode,the <code>ResourceManager</code> verifies access to the
|
||||
* method before accepting the request.
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* The <code>ResourceManager</code> responds with a
|
||||
* {@link GetContainerReportResponse} which includes the
|
||||
* {@link ContainerReport} for the container.
|
||||
* </p>
|
||||
*
|
||||
* @param request
|
||||
* request for a container report
|
||||
* @return container report
|
||||
* @throws YarnException
|
||||
* @throws IOException
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public GetContainerReportResponse getContainerReport(
|
||||
GetContainerReportRequest request) throws YarnException, IOException;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* The interface used by clients to get a report of Containers for an
|
||||
* application attempt from the <code>ResourceManager</code>
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* The client, via {@link GetContainersRequest} provides the
|
||||
* {@link ApplicationAttemptId} of the application attempt.
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* In secure mode,the <code>ResourceManager</code> verifies access to the
|
||||
* method before accepting the request.
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* The <code>ResourceManager</code> responds with a
|
||||
* {@link GetContainersResponse} which includes a list of
|
||||
* {@link ContainerReport} for all the containers of a specific application
|
||||
* attempt.
|
||||
* </p>
|
||||
*
|
||||
* @param request
|
||||
* request for a list of container reports of an application attempt.
|
||||
* @return reports on all containers of an application attempt
|
||||
* @throws YarnException
|
||||
* @throws IOException
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public GetContainersResponse getContainers(GetContainersRequest request)
|
||||
throws YarnException, IOException;
|
||||
|
||||
}
|
||||
|
|
|
@ -45,5 +45,9 @@ service ApplicationClientProtocolService {
|
|||
rpc renewDelegationToken(hadoop.common.RenewDelegationTokenRequestProto) returns (hadoop.common.RenewDelegationTokenResponseProto);
|
||||
rpc cancelDelegationToken(hadoop.common.CancelDelegationTokenRequestProto) returns (hadoop.common.CancelDelegationTokenResponseProto);
|
||||
rpc moveApplicationAcrossQueues(MoveApplicationAcrossQueuesRequestProto) returns (MoveApplicationAcrossQueuesResponseProto);
|
||||
rpc getApplicationAttemptReport (GetApplicationAttemptReportRequestProto) returns (GetApplicationAttemptReportResponseProto);
|
||||
rpc getApplicationAttempts (GetApplicationAttemptsRequestProto) returns (GetApplicationAttemptsResponseProto);
|
||||
rpc getContainerReport (GetContainerReportRequestProto) returns (GetContainerReportResponseProto);
|
||||
rpc getContainers (GetContainersRequestProto) returns (GetContainersResponseProto);
|
||||
}
|
||||
|
||||
|
|
|
@ -32,6 +32,10 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
|
||||
|
@ -40,6 +44,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
||||
|
@ -282,18 +290,15 @@ public class YarnClientImpl extends YarnClient {
|
|||
request.setApplicationId(appId);
|
||||
response = rmClient.getApplicationReport(request);
|
||||
} catch (YarnException e) {
|
||||
|
||||
if (!historyServiceEnabled) {
|
||||
// Just throw it as usual if historyService is not enabled.
|
||||
throw e;
|
||||
}
|
||||
|
||||
// Even if history-service is enabled, treat all exceptions still the same
|
||||
// except the following
|
||||
if (!(e.getClass() == ApplicationNotFoundException.class)) {
|
||||
throw e;
|
||||
}
|
||||
|
||||
return historyClient.getApplicationReport(appId);
|
||||
}
|
||||
return response.getApplicationReport();
|
||||
|
@ -461,38 +466,97 @@ public class YarnClientImpl extends YarnClient {
|
|||
@Override
|
||||
public ApplicationAttemptReport getApplicationAttemptReport(
|
||||
ApplicationAttemptId appAttemptId) throws YarnException, IOException {
|
||||
if (historyServiceEnabled) {
|
||||
try {
|
||||
GetApplicationAttemptReportRequest request = Records
|
||||
.newRecord(GetApplicationAttemptReportRequest.class);
|
||||
request.setApplicationAttemptId(appAttemptId);
|
||||
GetApplicationAttemptReportResponse response = rmClient
|
||||
.getApplicationAttemptReport(request);
|
||||
return response.getApplicationAttemptReport();
|
||||
} catch (YarnException e) {
|
||||
if (!historyServiceEnabled) {
|
||||
// Just throw it as usual if historyService is not enabled.
|
||||
throw e;
|
||||
}
|
||||
// Even if history-service is enabled, treat all exceptions still the same
|
||||
// except the following
|
||||
if (e.getClass() != ApplicationNotFoundException.class) {
|
||||
throw e;
|
||||
}
|
||||
return historyClient.getApplicationAttemptReport(appAttemptId);
|
||||
}
|
||||
throw new YarnException("History service is not enabled.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ApplicationAttemptReport> getApplicationAttempts(
|
||||
ApplicationId appId) throws YarnException, IOException {
|
||||
if (historyServiceEnabled) {
|
||||
try {
|
||||
GetApplicationAttemptsRequest request = Records
|
||||
.newRecord(GetApplicationAttemptsRequest.class);
|
||||
request.setApplicationId(appId);
|
||||
GetApplicationAttemptsResponse response = rmClient
|
||||
.getApplicationAttempts(request);
|
||||
return response.getApplicationAttemptList();
|
||||
} catch (YarnException e) {
|
||||
if (!historyServiceEnabled) {
|
||||
// Just throw it as usual if historyService is not enabled.
|
||||
throw e;
|
||||
}
|
||||
// Even if history-service is enabled, treat all exceptions still the same
|
||||
// except the following
|
||||
if (e.getClass() != ApplicationNotFoundException.class) {
|
||||
throw e;
|
||||
}
|
||||
return historyClient.getApplicationAttempts(appId);
|
||||
}
|
||||
throw new YarnException("History service is not enabled.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public ContainerReport getContainerReport(ContainerId containerId)
|
||||
throws YarnException, IOException {
|
||||
if (historyServiceEnabled) {
|
||||
try {
|
||||
GetContainerReportRequest request = Records
|
||||
.newRecord(GetContainerReportRequest.class);
|
||||
request.setContainerId(containerId);
|
||||
GetContainerReportResponse response = rmClient
|
||||
.getContainerReport(request);
|
||||
return response.getContainerReport();
|
||||
} catch (YarnException e) {
|
||||
if (!historyServiceEnabled) {
|
||||
// Just throw it as usual if historyService is not enabled.
|
||||
throw e;
|
||||
}
|
||||
// Even if history-service is enabled, treat all exceptions still the same
|
||||
// except the following
|
||||
if (e.getClass() != ApplicationNotFoundException.class) {
|
||||
throw e;
|
||||
}
|
||||
return historyClient.getContainerReport(containerId);
|
||||
}
|
||||
throw new YarnException("History service is not enabled.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ContainerReport> getContainers(
|
||||
ApplicationAttemptId applicationAttemptId) throws YarnException,
|
||||
IOException {
|
||||
if (historyServiceEnabled) {
|
||||
try {
|
||||
GetContainersRequest request = Records
|
||||
.newRecord(GetContainersRequest.class);
|
||||
request.setApplicationAttemptId(applicationAttemptId);
|
||||
GetContainersResponse response = rmClient.getContainers(request);
|
||||
return response.getContainerList();
|
||||
} catch (YarnException e) {
|
||||
if (!historyServiceEnabled) {
|
||||
// Just throw it as usual if historyService is not enabled.
|
||||
throw e;
|
||||
}
|
||||
// Even if history-service is enabled, treat all exceptions still the same
|
||||
// except the following
|
||||
if (e.getClass() != ApplicationNotFoundException.class) {
|
||||
throw e;
|
||||
}
|
||||
return historyClient.getContainers(applicationAttemptId);
|
||||
}
|
||||
throw new YarnException("History service is not enabled.");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
|
||||
package org.apache.hadoop.yarn.client.api.impl;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
|
@ -40,21 +39,35 @@ import org.apache.commons.io.IOUtils;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerReport;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
|
||||
|
@ -251,11 +264,103 @@ public class TestYarnClient {
|
|||
client.stop();
|
||||
}
|
||||
|
||||
@Test(timeout = 10000)
|
||||
public void testGetApplicationAttempts() throws YarnException, IOException {
|
||||
Configuration conf = new Configuration();
|
||||
final YarnClient client = new MockYarnClient();
|
||||
client.init(conf);
|
||||
client.start();
|
||||
|
||||
ApplicationId applicationId = ApplicationId.newInstance(1234, 5);
|
||||
List<ApplicationAttemptReport> reports = client
|
||||
.getApplicationAttempts(applicationId);
|
||||
Assert.assertNotNull(reports);
|
||||
Assert.assertEquals(reports.get(0).getApplicationAttemptId(),
|
||||
ApplicationAttemptId.newInstance(applicationId, 1));
|
||||
Assert.assertEquals(reports.get(1).getApplicationAttemptId(),
|
||||
ApplicationAttemptId.newInstance(applicationId, 2));
|
||||
client.stop();
|
||||
}
|
||||
|
||||
@Test(timeout = 10000)
|
||||
public void testGetApplicationAttempt() throws YarnException, IOException {
|
||||
Configuration conf = new Configuration();
|
||||
final YarnClient client = new MockYarnClient();
|
||||
client.init(conf);
|
||||
client.start();
|
||||
|
||||
List<ApplicationReport> expectedReports = ((MockYarnClient) client)
|
||||
.getReports();
|
||||
|
||||
ApplicationId applicationId = ApplicationId.newInstance(1234, 5);
|
||||
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
|
||||
applicationId, 1);
|
||||
ApplicationAttemptReport report = client
|
||||
.getApplicationAttemptReport(appAttemptId);
|
||||
Assert.assertNotNull(report);
|
||||
Assert.assertEquals(report.getApplicationAttemptId().toString(),
|
||||
expectedReports.get(0).getCurrentApplicationAttemptId().toString());
|
||||
client.stop();
|
||||
}
|
||||
|
||||
@Test(timeout = 10000)
|
||||
public void testGetContainers() throws YarnException, IOException {
|
||||
Configuration conf = new Configuration();
|
||||
final YarnClient client = new MockYarnClient();
|
||||
client.init(conf);
|
||||
client.start();
|
||||
|
||||
ApplicationId applicationId = ApplicationId.newInstance(1234, 5);
|
||||
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
|
||||
applicationId, 1);
|
||||
List<ContainerReport> reports = client.getContainers(appAttemptId);
|
||||
Assert.assertNotNull(reports);
|
||||
Assert.assertEquals(reports.get(0).getContainerId(),
|
||||
(ContainerId.newInstance(appAttemptId, 1)));
|
||||
Assert.assertEquals(reports.get(1).getContainerId(),
|
||||
(ContainerId.newInstance(appAttemptId, 2)));
|
||||
client.stop();
|
||||
}
|
||||
|
||||
@Test(timeout = 10000)
|
||||
public void testGetContainerReport() throws YarnException, IOException {
|
||||
Configuration conf = new Configuration();
|
||||
final YarnClient client = new MockYarnClient();
|
||||
client.init(conf);
|
||||
client.start();
|
||||
|
||||
List<ApplicationReport> expectedReports = ((MockYarnClient) client)
|
||||
.getReports();
|
||||
|
||||
ApplicationId applicationId = ApplicationId.newInstance(1234, 5);
|
||||
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
|
||||
applicationId, 1);
|
||||
ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
|
||||
ContainerReport report = client.getContainerReport(containerId);
|
||||
Assert.assertNotNull(report);
|
||||
Assert.assertEquals(report.getContainerId().toString(),
|
||||
(ContainerId.newInstance(expectedReports.get(0)
|
||||
.getCurrentApplicationAttemptId(), 1)).toString());
|
||||
client.stop();
|
||||
}
|
||||
|
||||
private static class MockYarnClient extends YarnClientImpl {
|
||||
private ApplicationReport mockReport;
|
||||
private List<ApplicationReport> reports;
|
||||
private HashMap<ApplicationId, List<ApplicationAttemptReport>> attempts =
|
||||
new HashMap<ApplicationId, List<ApplicationAttemptReport>>();
|
||||
private HashMap<ApplicationAttemptId, List<ContainerReport>> containers =
|
||||
new HashMap<ApplicationAttemptId, List<ContainerReport>>();
|
||||
GetApplicationsResponse mockAppResponse =
|
||||
mock(GetApplicationsResponse.class);
|
||||
GetApplicationAttemptsResponse mockAppAttemptsResponse =
|
||||
mock(GetApplicationAttemptsResponse.class);
|
||||
GetApplicationAttemptReportResponse mockAttemptResponse =
|
||||
mock(GetApplicationAttemptReportResponse.class);
|
||||
GetContainersResponse mockContainersResponse =
|
||||
mock(GetContainersResponse.class);
|
||||
GetContainerReportResponse mockContainerResponse =
|
||||
mock(GetContainerReportResponse.class);
|
||||
|
||||
public MockYarnClient() {
|
||||
super();
|
||||
|
@ -278,6 +383,19 @@ public class TestYarnClient {
|
|||
KillApplicationRequest.class)))
|
||||
.thenReturn(KillApplicationResponse.newInstance(false)).thenReturn(
|
||||
KillApplicationResponse.newInstance(true));
|
||||
when(
|
||||
rmClient
|
||||
.getApplicationAttemptReport(any(GetApplicationAttemptReportRequest.class)))
|
||||
.thenReturn(mockAttemptResponse);
|
||||
when(
|
||||
rmClient
|
||||
.getApplicationAttempts(any(GetApplicationAttemptsRequest.class)))
|
||||
.thenReturn(mockAppAttemptsResponse);
|
||||
when(rmClient.getContainers(any(GetContainersRequest.class)))
|
||||
.thenReturn(mockContainersResponse);
|
||||
|
||||
when(rmClient.getContainerReport(any(GetContainerReportRequest.class)))
|
||||
.thenReturn(mockContainerResponse);
|
||||
} catch (YarnException e) {
|
||||
Assert.fail("Exception is not expected.");
|
||||
} catch (IOException e) {
|
||||
|
@ -320,9 +438,44 @@ public class TestYarnClient {
|
|||
"user", "queue", "appname", "host", 124, null,
|
||||
YarnApplicationState.RUNNING, "diagnostics", "url", 0, 0,
|
||||
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null);
|
||||
List<ApplicationReport> applicationReports =
|
||||
new ArrayList<ApplicationReport>();
|
||||
List<ApplicationReport> applicationReports = new ArrayList<ApplicationReport>();
|
||||
applicationReports.add(newApplicationReport);
|
||||
List<ApplicationAttemptReport> appAttempts = new ArrayList<ApplicationAttemptReport>();
|
||||
ApplicationAttemptReport attempt = ApplicationAttemptReport.newInstance(
|
||||
ApplicationAttemptId.newInstance(applicationId, 1),
|
||||
"host",
|
||||
124,
|
||||
"url",
|
||||
"diagnostics",
|
||||
YarnApplicationAttemptState.FINISHED,
|
||||
ContainerId.newInstance(
|
||||
newApplicationReport.getCurrentApplicationAttemptId(), 1));
|
||||
appAttempts.add(attempt);
|
||||
ApplicationAttemptReport attempt1 = ApplicationAttemptReport.newInstance(
|
||||
ApplicationAttemptId.newInstance(applicationId, 2),
|
||||
"host",
|
||||
124,
|
||||
"url",
|
||||
"diagnostics",
|
||||
YarnApplicationAttemptState.FINISHED,
|
||||
ContainerId.newInstance(
|
||||
newApplicationReport.getCurrentApplicationAttemptId(), 2));
|
||||
appAttempts.add(attempt1);
|
||||
attempts.put(applicationId, appAttempts);
|
||||
|
||||
List<ContainerReport> containerReports = new ArrayList<ContainerReport>();
|
||||
ContainerReport container = ContainerReport.newInstance(
|
||||
ContainerId.newInstance(attempt.getApplicationAttemptId(), 1), null,
|
||||
NodeId.newInstance("host", 1234), Priority.UNDEFINED, 1234, 5678,
|
||||
"diagnosticInfo", "logURL", 0, ContainerState.COMPLETE);
|
||||
containerReports.add(container);
|
||||
|
||||
ContainerReport container1 = ContainerReport.newInstance(
|
||||
ContainerId.newInstance(attempt.getApplicationAttemptId(), 2), null,
|
||||
NodeId.newInstance("host", 1234), Priority.UNDEFINED, 1234, 5678,
|
||||
"diagnosticInfo", "logURL", 0, ContainerState.COMPLETE);
|
||||
containerReports.add(container1);
|
||||
containers.put(attempt.getApplicationAttemptId(), containerReports);
|
||||
|
||||
ApplicationId applicationId2 = ApplicationId.newInstance(1234, 6);
|
||||
ApplicationReport newApplicationReport2 = ApplicationReport.newInstance(
|
||||
|
@ -376,6 +529,57 @@ public class TestYarnClient {
|
|||
}
|
||||
return appReports;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ApplicationAttemptReport> getApplicationAttempts(
|
||||
ApplicationId appId) throws YarnException, IOException {
|
||||
when(mockAppAttemptsResponse.getApplicationAttemptList()).thenReturn(
|
||||
getAttempts(appId));
|
||||
return super.getApplicationAttempts(appId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApplicationAttemptReport getApplicationAttemptReport(
|
||||
ApplicationAttemptId appAttemptId) throws YarnException, IOException {
|
||||
when(mockAttemptResponse.getApplicationAttemptReport()).thenReturn(
|
||||
getAttempt(appAttemptId));
|
||||
return super.getApplicationAttemptReport(appAttemptId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ContainerReport>
|
||||
getContainers(ApplicationAttemptId appAttemptId) throws YarnException,
|
||||
IOException {
|
||||
when(mockContainersResponse.getContainerList()).thenReturn(
|
||||
getContainersReport(appAttemptId));
|
||||
return super.getContainers(appAttemptId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ContainerReport getContainerReport(ContainerId containerId)
|
||||
throws YarnException, IOException {
|
||||
when(mockContainerResponse.getContainerReport()).thenReturn(
|
||||
getContainer(containerId));
|
||||
return super.getContainerReport(containerId);
|
||||
}
|
||||
|
||||
public List<ApplicationAttemptReport> getAttempts(ApplicationId appId) {
|
||||
return attempts.get(appId);
|
||||
}
|
||||
|
||||
public ApplicationAttemptReport
|
||||
getAttempt(ApplicationAttemptId appAttemptId) {
|
||||
return attempts.get(appAttemptId.getApplicationId()).get(0);
|
||||
}
|
||||
|
||||
public List<ContainerReport> getContainersReport(
|
||||
ApplicationAttemptId appAttemptId) {
|
||||
return containers.get(appAttemptId);
|
||||
}
|
||||
|
||||
public ContainerReport getContainer(ContainerId containerId) {
|
||||
return containers.get(containerId.getApplicationAttemptId()).get(0);
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
|
|
|
@ -33,6 +33,10 @@ import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
|||
import org.apache.hadoop.yarn.api.ApplicationClientProtocolPB;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||
|
@ -41,6 +45,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
||||
|
@ -59,6 +67,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationAttemptReportRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationAttemptReportResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationAttemptsRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationAttemptsResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationsRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationsResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationReportRequestPBImpl;
|
||||
|
@ -67,6 +79,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetClusterMetricsReque
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetClusterMetricsResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetClusterNodesRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetClusterNodesResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerReportRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerReportResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainersRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainersResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetDelegationTokenRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetDelegationTokenResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNewApplicationRequestPBImpl;
|
||||
|
@ -95,6 +111,11 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoReques
|
|||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.MoveApplicationAcrossQueuesRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptsRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptReportRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerReportRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainersRequestProto;
|
||||
|
||||
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
|
@ -312,4 +333,62 @@ public class ApplicationClientProtocolPBClientImpl implements ApplicationClientP
|
|||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetApplicationAttemptReportResponse getApplicationAttemptReport(
|
||||
GetApplicationAttemptReportRequest request) throws YarnException,
|
||||
IOException {
|
||||
GetApplicationAttemptReportRequestProto requestProto =
|
||||
((GetApplicationAttemptReportRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new GetApplicationAttemptReportResponsePBImpl(
|
||||
proxy.getApplicationAttemptReport(null, requestProto));
|
||||
} catch (ServiceException e) {
|
||||
RPCUtil.unwrapAndThrowException(e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetApplicationAttemptsResponse getApplicationAttempts(
|
||||
GetApplicationAttemptsRequest request) throws YarnException, IOException {
|
||||
GetApplicationAttemptsRequestProto requestProto =
|
||||
((GetApplicationAttemptsRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new GetApplicationAttemptsResponsePBImpl(
|
||||
proxy.getApplicationAttempts(null, requestProto));
|
||||
} catch (ServiceException e) {
|
||||
RPCUtil.unwrapAndThrowException(e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetContainerReportResponse getContainerReport(
|
||||
GetContainerReportRequest request) throws YarnException, IOException {
|
||||
GetContainerReportRequestProto requestProto =
|
||||
((GetContainerReportRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new GetContainerReportResponsePBImpl(proxy.getContainerReport(
|
||||
null, requestProto));
|
||||
} catch (ServiceException e) {
|
||||
RPCUtil.unwrapAndThrowException(e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetContainersResponse getContainers(GetContainersRequest request)
|
||||
throws YarnException, IOException {
|
||||
GetContainersRequestProto requestProto =
|
||||
((GetContainersRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new GetContainersResponsePBImpl(proxy.getContainers(null,
|
||||
requestProto));
|
||||
} catch (ServiceException e) {
|
||||
RPCUtil.unwrapAndThrowException(e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -30,10 +30,14 @@ import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRespo
|
|||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocolPB;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
|
||||
|
@ -44,6 +48,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationAttemptReportRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationAttemptReportResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationAttemptsRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationAttemptsResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationsRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationsResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationReportRequestPBImpl;
|
||||
|
@ -52,6 +60,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetClusterMetricsReque
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetClusterMetricsResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetClusterNodesRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetClusterNodesResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerReportRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerReportResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainersRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainersResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetDelegationTokenRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetDelegationTokenResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNewApplicationRequestPBImpl;
|
||||
|
@ -89,6 +101,14 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.MoveApplicationAcrossQueue
|
|||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.MoveApplicationAcrossQueuesResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptReportRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptReportResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptsRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptsResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerReportRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerReportResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainersRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainersResponseProto;
|
||||
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
@ -299,4 +319,68 @@ public class ApplicationClientProtocolPBServiceImpl implements ApplicationClient
|
|||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetApplicationAttemptReportResponseProto getApplicationAttemptReport(
|
||||
RpcController controller, GetApplicationAttemptReportRequestProto proto)
|
||||
throws ServiceException {
|
||||
GetApplicationAttemptReportRequestPBImpl request =
|
||||
new GetApplicationAttemptReportRequestPBImpl(proto);
|
||||
try {
|
||||
GetApplicationAttemptReportResponse response =
|
||||
real.getApplicationAttemptReport(request);
|
||||
return ((GetApplicationAttemptReportResponsePBImpl) response).getProto();
|
||||
} catch (YarnException e) {
|
||||
throw new ServiceException(e);
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetApplicationAttemptsResponseProto getApplicationAttempts(
|
||||
RpcController controller, GetApplicationAttemptsRequestProto proto)
|
||||
throws ServiceException {
|
||||
GetApplicationAttemptsRequestPBImpl request =
|
||||
new GetApplicationAttemptsRequestPBImpl(proto);
|
||||
try {
|
||||
GetApplicationAttemptsResponse response =
|
||||
real.getApplicationAttempts(request);
|
||||
return ((GetApplicationAttemptsResponsePBImpl) response).getProto();
|
||||
} catch (YarnException e) {
|
||||
throw new ServiceException(e);
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetContainerReportResponseProto getContainerReport(
|
||||
RpcController controller, GetContainerReportRequestProto proto)
|
||||
throws ServiceException {
|
||||
GetContainerReportRequestPBImpl request =
|
||||
new GetContainerReportRequestPBImpl(proto);
|
||||
try {
|
||||
GetContainerReportResponse response = real.getContainerReport(request);
|
||||
return ((GetContainerReportResponsePBImpl) response).getProto();
|
||||
} catch (YarnException e) {
|
||||
throw new ServiceException(e);
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetContainersResponseProto getContainers(RpcController controller,
|
||||
GetContainersRequestProto proto) throws ServiceException {
|
||||
GetContainersRequestPBImpl request = new GetContainersRequestPBImpl(proto);
|
||||
try {
|
||||
GetContainersResponse response = real.getContainers(request);
|
||||
return ((GetContainersResponsePBImpl) response).getProto();
|
||||
} catch (YarnException e) {
|
||||
throw new ServiceException(e);
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.net.InetSocketAddress;
|
|||
import java.security.AccessControlException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
@ -48,6 +49,10 @@ import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
|
||||
|
@ -56,6 +61,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
||||
|
@ -74,9 +83,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerReport;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
|
@ -85,7 +97,9 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
|||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
|
||||
import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
|
@ -98,7 +112,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
|
||||
|
@ -280,6 +297,189 @@ public class ClientRMService extends AbstractService implements
|
|||
return response;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetApplicationAttemptReportResponse getApplicationAttemptReport(
|
||||
GetApplicationAttemptReportRequest request) throws YarnException,
|
||||
IOException {
|
||||
ApplicationAttemptId appAttemptId = request.getApplicationAttemptId();
|
||||
UserGroupInformation callerUGI;
|
||||
try {
|
||||
callerUGI = UserGroupInformation.getCurrentUser();
|
||||
} catch (IOException ie) {
|
||||
LOG.info("Error getting UGI ", ie);
|
||||
throw RPCUtil.getRemoteException(ie);
|
||||
}
|
||||
RMApp application = this.rmContext.getRMApps().get(
|
||||
appAttemptId.getApplicationId());
|
||||
if (application == null) {
|
||||
// If the RM doesn't have the application, throw
|
||||
// ApplicationNotFoundException and let client to handle.
|
||||
throw new ApplicationNotFoundException("Application with id '"
|
||||
+ request.getApplicationAttemptId().getApplicationId()
|
||||
+ "' doesn't exist in RM.");
|
||||
}
|
||||
|
||||
boolean allowAccess = checkAccess(callerUGI, application.getUser(),
|
||||
ApplicationAccessType.VIEW_APP, application);
|
||||
GetApplicationAttemptReportResponse response = null;
|
||||
if (allowAccess) {
|
||||
RMAppAttempt appAttempt = application.getAppAttempts().get(appAttemptId);
|
||||
if (appAttempt == null) {
|
||||
throw new ApplicationAttemptNotFoundException("ApplicationAttempt "
|
||||
+ appAttemptId + " Not Found in RM");
|
||||
}
|
||||
ApplicationAttemptReport attemptReport = appAttempt
|
||||
.createApplicationAttemptReport();
|
||||
response = GetApplicationAttemptReportResponse.newInstance(attemptReport);
|
||||
}else{
|
||||
throw new YarnException("User " + callerUGI.getShortUserName()
|
||||
+ " does not have privilage to see this attempt " + appAttemptId);
|
||||
}
|
||||
return response;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetApplicationAttemptsResponse getApplicationAttempts(
|
||||
GetApplicationAttemptsRequest request) throws YarnException, IOException {
|
||||
ApplicationId appId = request.getApplicationId();
|
||||
UserGroupInformation callerUGI;
|
||||
try {
|
||||
callerUGI = UserGroupInformation.getCurrentUser();
|
||||
} catch (IOException ie) {
|
||||
LOG.info("Error getting UGI ", ie);
|
||||
throw RPCUtil.getRemoteException(ie);
|
||||
}
|
||||
RMApp application = this.rmContext.getRMApps().get(appId);
|
||||
if (application == null) {
|
||||
// If the RM doesn't have the application, throw
|
||||
// ApplicationNotFoundException and let client to handle.
|
||||
throw new ApplicationNotFoundException("Application with id '" + appId
|
||||
+ "' doesn't exist in RM.");
|
||||
}
|
||||
boolean allowAccess = checkAccess(callerUGI, application.getUser(),
|
||||
ApplicationAccessType.VIEW_APP, application);
|
||||
GetApplicationAttemptsResponse response = null;
|
||||
if (allowAccess) {
|
||||
Map<ApplicationAttemptId, RMAppAttempt> attempts = application
|
||||
.getAppAttempts();
|
||||
List<ApplicationAttemptReport> listAttempts =
|
||||
new ArrayList<ApplicationAttemptReport>();
|
||||
Iterator<Map.Entry<ApplicationAttemptId, RMAppAttempt>> iter = attempts
|
||||
.entrySet().iterator();
|
||||
while (iter.hasNext()) {
|
||||
listAttempts.add(iter.next().getValue()
|
||||
.createApplicationAttemptReport());
|
||||
}
|
||||
response = GetApplicationAttemptsResponse.newInstance(listAttempts);
|
||||
} else {
|
||||
throw new YarnException("User " + callerUGI.getShortUserName()
|
||||
+ " does not have privilage to see this aplication " + appId);
|
||||
}
|
||||
return response;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* we're going to fix the issue of showing non-running containers of the
|
||||
* running application in YARN-1794
|
||||
*/
|
||||
@Override
|
||||
public GetContainerReportResponse getContainerReport(
|
||||
GetContainerReportRequest request) throws YarnException, IOException {
|
||||
ContainerId containerId = request.getContainerId();
|
||||
ApplicationAttemptId appAttemptId = containerId.getApplicationAttemptId();
|
||||
ApplicationId appId = appAttemptId.getApplicationId();
|
||||
UserGroupInformation callerUGI;
|
||||
try {
|
||||
callerUGI = UserGroupInformation.getCurrentUser();
|
||||
} catch (IOException ie) {
|
||||
LOG.info("Error getting UGI ", ie);
|
||||
throw RPCUtil.getRemoteException(ie);
|
||||
}
|
||||
RMApp application = this.rmContext.getRMApps().get(appId);
|
||||
if (application == null) {
|
||||
// If the RM doesn't have the application, throw
|
||||
// ApplicationNotFoundException and let client to handle.
|
||||
throw new ApplicationNotFoundException("Application with id '" + appId
|
||||
+ "' doesn't exist in RM.");
|
||||
}
|
||||
boolean allowAccess = checkAccess(callerUGI, application.getUser(),
|
||||
ApplicationAccessType.VIEW_APP, application);
|
||||
GetContainerReportResponse response = null;
|
||||
if (allowAccess) {
|
||||
RMAppAttempt appAttempt = application.getAppAttempts().get(appAttemptId);
|
||||
if (appAttempt == null) {
|
||||
throw new ApplicationAttemptNotFoundException("ApplicationAttempt "
|
||||
+ appAttemptId + " Not Found in RM");
|
||||
}
|
||||
RMContainer rmConatiner = this.rmContext.getScheduler().getRMContainer(
|
||||
containerId);
|
||||
if (rmConatiner == null) {
|
||||
throw new ContainerNotFoundException("Container with id " + containerId
|
||||
+ " not found");
|
||||
}
|
||||
response = GetContainerReportResponse.newInstance(rmConatiner
|
||||
.createContainerReport());
|
||||
} else {
|
||||
throw new YarnException("User " + callerUGI.getShortUserName()
|
||||
+ " does not have privilage to see this aplication " + appId);
|
||||
}
|
||||
return response;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* we're going to fix the issue of showing non-running containers of the
|
||||
* running application in YARN-1794"
|
||||
*/
|
||||
@Override
|
||||
public GetContainersResponse getContainers(GetContainersRequest request)
|
||||
throws YarnException, IOException {
|
||||
ApplicationAttemptId appAttemptId = request.getApplicationAttemptId();
|
||||
ApplicationId appId = appAttemptId.getApplicationId();
|
||||
UserGroupInformation callerUGI;
|
||||
try {
|
||||
callerUGI = UserGroupInformation.getCurrentUser();
|
||||
} catch (IOException ie) {
|
||||
LOG.info("Error getting UGI ", ie);
|
||||
throw RPCUtil.getRemoteException(ie);
|
||||
}
|
||||
RMApp application = this.rmContext.getRMApps().get(appId);
|
||||
if (application == null) {
|
||||
// If the RM doesn't have the application, throw
|
||||
// ApplicationNotFoundException and let client to handle.
|
||||
throw new ApplicationNotFoundException("Application with id '" + appId
|
||||
+ "' doesn't exist in RM.");
|
||||
}
|
||||
boolean allowAccess = checkAccess(callerUGI, application.getUser(),
|
||||
ApplicationAccessType.VIEW_APP, application);
|
||||
GetContainersResponse response = null;
|
||||
if (allowAccess) {
|
||||
RMAppAttempt appAttempt = application.getAppAttempts().get(appAttemptId);
|
||||
if (appAttempt == null) {
|
||||
throw new ApplicationAttemptNotFoundException("ApplicationAttempt "
|
||||
+ appAttemptId + " Not Found in RM");
|
||||
}
|
||||
Collection<RMContainer> rmContainers = Collections.emptyList();
|
||||
SchedulerAppReport schedulerAppReport =
|
||||
this.rmContext.getScheduler().getSchedulerAppInfo(appAttemptId);
|
||||
if (schedulerAppReport != null) {
|
||||
rmContainers = schedulerAppReport.getLiveContainers();
|
||||
}
|
||||
List<ContainerReport> listContainers = new ArrayList<ContainerReport>();
|
||||
for (RMContainer rmContainer : rmContainers) {
|
||||
listContainers.add(rmContainer.createContainerReport());
|
||||
}
|
||||
response = GetContainersResponse.newInstance(listContainers);
|
||||
} else {
|
||||
throw new YarnException("User " + callerUGI.getShortUserName()
|
||||
+ " does not have privilage to see this aplication " + appId);
|
||||
}
|
||||
return response;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SubmitApplicationResponse submitApplication(
|
||||
SubmitApplicationRequest request) throws YarnException {
|
||||
|
|
|
@ -26,6 +26,7 @@ import javax.crypto.SecretKey;
|
|||
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
|
@ -196,4 +197,11 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
|
|||
*/
|
||||
YarnApplicationAttemptState createApplicationAttemptState();
|
||||
|
||||
/**
|
||||
* Create the Application attempt report from the {@link RMAppAttempt}
|
||||
*
|
||||
* @return {@link ApplicationAttemptReport}
|
||||
*/
|
||||
ApplicationAttemptReport createApplicationAttemptReport();
|
||||
|
||||
}
|
||||
|
|
|
@ -44,6 +44,7 @@ import org.apache.hadoop.security.UserGroupInformation;
|
|||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.ExitUtil;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
|
@ -1630,4 +1631,20 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
private static String sanitizeTrackingUrl(String url) {
|
||||
return (url == null || url.trim().isEmpty()) ? "N/A" : url;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApplicationAttemptReport createApplicationAttemptReport() {
|
||||
this.readLock.lock();
|
||||
ApplicationAttemptReport attemptReport = null;
|
||||
try {
|
||||
attemptReport = ApplicationAttemptReport.newInstance(this
|
||||
.getAppAttemptId(), this.getHost(), this.getRpcPort(), this
|
||||
.getTrackingUrl(), this.getDiagnostics(), YarnApplicationAttemptState
|
||||
.valueOf(this.getState().toString()), this.getMasterContainer()
|
||||
.getId());
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
return attemptReport;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerReport;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
|
@ -69,4 +70,6 @@ public interface RMContainer extends EventHandler<RMContainerEvent> {
|
|||
|
||||
ContainerState getContainerState();
|
||||
|
||||
ContainerReport createContainerReport();
|
||||
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerReport;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
|
@ -151,6 +152,8 @@ public class RMContainerImpl implements RMContainer {
|
|||
private ContainerStatus finishedStatus;
|
||||
|
||||
|
||||
|
||||
|
||||
public RMContainerImpl(Container container,
|
||||
ApplicationAttemptId appAttemptId, NodeId nodeId,
|
||||
String user, RMContext rmContext) {
|
||||
|
@ -247,7 +250,11 @@ public class RMContainerImpl implements RMContainer {
|
|||
public String getDiagnosticsInfo() {
|
||||
try {
|
||||
readLock.lock();
|
||||
return finishedStatus.getDiagnostics();
|
||||
if (getFinishedStatus() != null) {
|
||||
return getFinishedStatus().getDiagnostics();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
|
@ -267,7 +274,11 @@ public class RMContainerImpl implements RMContainer {
|
|||
public int getContainerExitStatus() {
|
||||
try {
|
||||
readLock.lock();
|
||||
return finishedStatus.getExitStatus();
|
||||
if (getFinishedStatus() != null) {
|
||||
return getFinishedStatus().getExitStatus();
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
|
@ -277,7 +288,11 @@ public class RMContainerImpl implements RMContainer {
|
|||
public ContainerState getContainerState() {
|
||||
try {
|
||||
readLock.lock();
|
||||
return finishedStatus.getState();
|
||||
if (getFinishedStatus() != null) {
|
||||
return getFinishedStatus().getState();
|
||||
} else {
|
||||
return ContainerState.RUNNING;
|
||||
}
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
|
@ -312,6 +327,10 @@ public class RMContainerImpl implements RMContainer {
|
|||
}
|
||||
}
|
||||
|
||||
public ContainerStatus getFinishedStatus() {
|
||||
return finishedStatus;
|
||||
}
|
||||
|
||||
private static class BaseTransition implements
|
||||
SingleArcTransition<RMContainerImpl, RMContainerEvent> {
|
||||
|
||||
|
@ -424,4 +443,20 @@ public class RMContainerImpl implements RMContainer {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ContainerReport createContainerReport() {
|
||||
this.readLock.lock();
|
||||
ContainerReport containerReport = null;
|
||||
try {
|
||||
containerReport = ContainerReport.newInstance(this.getContainerId(),
|
||||
this.getAllocatedResource(), this.getAllocatedNode(),
|
||||
this.getAllocatedPriority(), this.getStartTime(),
|
||||
this.getFinishTime(), this.getDiagnosticsInfo(), this.getLogURL(),
|
||||
this.getContainerExitStatus(), this.getContainerState());
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
return containerReport;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyBoolean;
|
||||
|
@ -35,8 +34,8 @@ import java.security.PrivilegedExceptionAction;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -44,10 +43,8 @@ import java.util.concurrent.BrokenBarrierException;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.commons.lang.math.LongRange;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -57,10 +54,18 @@ import org.apache.hadoop.security.token.Token;
|
|||
import org.apache.hadoop.yarn.MockApps;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
||||
|
@ -73,7 +78,11 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
|
@ -96,7 +105,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
|
||||
|
@ -108,6 +122,8 @@ import org.junit.AfterClass;
|
|||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
public class TestClientRMService {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestClientRMService.class);
|
||||
|
@ -221,6 +237,113 @@ public class TestClientRMService {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetApplicationAttemptReport() throws YarnException,
|
||||
IOException {
|
||||
ClientRMService rmService = createRMService();
|
||||
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
||||
GetApplicationAttemptReportRequest request = recordFactory
|
||||
.newRecordInstance(GetApplicationAttemptReportRequest.class);
|
||||
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
|
||||
ApplicationId.newInstance(123456, 1), 1);
|
||||
request.setApplicationAttemptId(attemptId);
|
||||
|
||||
try {
|
||||
GetApplicationAttemptReportResponse response = rmService
|
||||
.getApplicationAttemptReport(request);
|
||||
Assert.assertEquals(attemptId, response.getApplicationAttemptReport()
|
||||
.getApplicationAttemptId());
|
||||
} catch (ApplicationNotFoundException ex) {
|
||||
Assert.fail(ex.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetApplicationAttempts() throws YarnException, IOException {
|
||||
ClientRMService rmService = createRMService();
|
||||
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
||||
GetApplicationAttemptsRequest request = recordFactory
|
||||
.newRecordInstance(GetApplicationAttemptsRequest.class);
|
||||
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
|
||||
ApplicationId.newInstance(123456, 1), 1);
|
||||
request.setApplicationId(ApplicationId.newInstance(123456, 1));
|
||||
|
||||
try {
|
||||
GetApplicationAttemptsResponse response = rmService
|
||||
.getApplicationAttempts(request);
|
||||
Assert.assertEquals(1, response.getApplicationAttemptList().size());
|
||||
Assert.assertEquals(attemptId, response.getApplicationAttemptList()
|
||||
.get(0).getApplicationAttemptId());
|
||||
|
||||
} catch (ApplicationNotFoundException ex) {
|
||||
Assert.fail(ex.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetContainerReport() throws YarnException, IOException {
|
||||
ClientRMService rmService = createRMService();
|
||||
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
||||
GetContainerReportRequest request = recordFactory
|
||||
.newRecordInstance(GetContainerReportRequest.class);
|
||||
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
|
||||
ApplicationId.newInstance(123456, 1), 1);
|
||||
ContainerId containerId = ContainerId.newInstance(attemptId, 1);
|
||||
request.setContainerId(containerId);
|
||||
|
||||
try {
|
||||
GetContainerReportResponse response = rmService
|
||||
.getContainerReport(request);
|
||||
Assert.assertEquals(containerId, response.getContainerReport()
|
||||
.getContainerId());
|
||||
} catch (ApplicationNotFoundException ex) {
|
||||
Assert.fail(ex.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetContainers() throws YarnException, IOException {
|
||||
ClientRMService rmService = createRMService();
|
||||
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
||||
GetContainersRequest request = recordFactory
|
||||
.newRecordInstance(GetContainersRequest.class);
|
||||
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
|
||||
ApplicationId.newInstance(123456, 1), 1);
|
||||
ContainerId containerId = ContainerId.newInstance(attemptId, 1);
|
||||
request.setApplicationAttemptId(attemptId);
|
||||
try {
|
||||
GetContainersResponse response = rmService.getContainers(request);
|
||||
Assert.assertEquals(containerId, response.getContainerList().get(0)
|
||||
.getContainerId());
|
||||
} catch (ApplicationNotFoundException ex) {
|
||||
Assert.fail(ex.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public ClientRMService createRMService() throws IOException {
|
||||
YarnScheduler yarnScheduler = mockYarnScheduler();
|
||||
RMContext rmContext = mock(RMContext.class);
|
||||
mockRMContext(yarnScheduler, rmContext);
|
||||
ConcurrentHashMap<ApplicationId, RMApp> apps = getRMApps(rmContext,
|
||||
yarnScheduler);
|
||||
when(rmContext.getRMApps()).thenReturn(apps);
|
||||
RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler, null,
|
||||
mock(ApplicationACLsManager.class), new Configuration());
|
||||
when(rmContext.getDispatcher().getEventHandler()).thenReturn(
|
||||
new EventHandler<Event>() {
|
||||
public void handle(Event event) {
|
||||
}
|
||||
});
|
||||
|
||||
ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class);
|
||||
QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class);
|
||||
when(
|
||||
mockQueueACLsManager.checkAccess(any(UserGroupInformation.class),
|
||||
any(QueueACL.class), anyString())).thenReturn(true);
|
||||
return new ClientRMService(rmContext, yarnScheduler, appManager,
|
||||
mockAclsManager, mockQueueACLsManager, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testForceKillNonExistingApplication() throws YarnException {
|
||||
RMContext rmContext = mock(RMContext.class);
|
||||
|
@ -732,6 +855,8 @@ public class TestClientRMService {
|
|||
when(rmContext.getRMApps()).thenReturn(apps);
|
||||
when(yarnScheduler.getAppsInQueue(eq("testqueue"))).thenReturn(
|
||||
getSchedulerApps(apps));
|
||||
ResourceScheduler rs = mock(ResourceScheduler.class);
|
||||
when(rmContext.getScheduler()).thenReturn(rs);
|
||||
}
|
||||
|
||||
private ConcurrentHashMap<ApplicationId, RMApp> getRMApps(
|
||||
|
@ -772,13 +897,41 @@ public class TestClientRMService {
|
|||
ApplicationId applicationId3, YarnConfiguration config, String queueName) {
|
||||
ApplicationSubmissionContext asContext = mock(ApplicationSubmissionContext.class);
|
||||
when(asContext.getMaxAppAttempts()).thenReturn(1);
|
||||
RMAppImpl app = spy(new RMAppImpl(applicationId3, rmContext, config, null, null,
|
||||
queueName, asContext, yarnScheduler, null , System
|
||||
.currentTimeMillis(), "YARN", null));
|
||||
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(applicationId3, 1);
|
||||
RMAppAttemptImpl rmAppAttemptImpl = new RMAppAttemptImpl(attemptId,
|
||||
rmContext, yarnScheduler, null, asContext, config, false);
|
||||
RMAppImpl app = spy(new RMAppImpl(applicationId3, rmContext, config, null,
|
||||
null, queueName, asContext, yarnScheduler, null,
|
||||
System.currentTimeMillis(), "YARN", null));
|
||||
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
|
||||
ApplicationId.newInstance(123456, 1), 1);
|
||||
RMAppAttemptImpl rmAppAttemptImpl = spy(new RMAppAttemptImpl(attemptId,
|
||||
rmContext, yarnScheduler, null, asContext, config, false));
|
||||
Container container = Container.newInstance(
|
||||
ContainerId.newInstance(attemptId, 1), null, "", null, null, null);
|
||||
RMContainerImpl containerimpl = spy(new RMContainerImpl(container,
|
||||
attemptId, null, "", rmContext));
|
||||
Map<ApplicationAttemptId, RMAppAttempt> attempts =
|
||||
new HashMap<ApplicationAttemptId, RMAppAttempt>();
|
||||
attempts.put(attemptId, rmAppAttemptImpl);
|
||||
when(app.getCurrentAppAttempt()).thenReturn(rmAppAttemptImpl);
|
||||
when(app.getAppAttempts()).thenReturn(attempts);
|
||||
when(rmAppAttemptImpl.getMasterContainer()).thenReturn(container);
|
||||
ResourceScheduler rs = mock(ResourceScheduler.class);
|
||||
when(rmContext.getScheduler()).thenReturn(rs);
|
||||
when(rmContext.getScheduler().getRMContainer(any(ContainerId.class)))
|
||||
.thenReturn(containerimpl);
|
||||
SchedulerAppReport sAppReport = mock(SchedulerAppReport.class);
|
||||
when(
|
||||
rmContext.getScheduler().getSchedulerAppInfo(
|
||||
any(ApplicationAttemptId.class))).thenReturn(sAppReport);
|
||||
List<RMContainer> rmContainers = new ArrayList<RMContainer>();
|
||||
rmContainers.add(containerimpl);
|
||||
when(
|
||||
rmContext.getScheduler().getSchedulerAppInfo(attemptId)
|
||||
.getLiveContainers()).thenReturn(rmContainers);
|
||||
ContainerStatus cs = mock(ContainerStatus.class);
|
||||
when(containerimpl.getFinishedStatus()).thenReturn(cs);
|
||||
when(containerimpl.getDiagnosticsInfo()).thenReturn("N/A");
|
||||
when(containerimpl.getContainerExitStatus()).thenReturn(0);
|
||||
when(containerimpl.getContainerState()).thenReturn(ContainerState.COMPLETE);
|
||||
return app;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue