YARN-10463: For Federation, we should support getApplicationAttemptReport. (#2563)

Qi Zhu via Zhankun Tang
This commit is contained in:
zhuqi 2020-12-21 10:04:16 +08:00 committed by GitHub
parent 2aea43bf4f
commit bb528e3239
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 262 additions and 3 deletions

View File

@ -51,6 +51,8 @@ public final class RouterMetrics {
private MutableGaugeInt numAppsFailedRetrieved;
@Metric("# of multiple applications reports failed to be retrieved")
private MutableGaugeInt numMultipleAppsFailedRetrieved;
@Metric("# of applicationAttempt reports failed to be retrieved")
private MutableGaugeInt numAppAttemptsFailedRetrieved;
// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful Submitted apps and latency(ms)")
@ -64,6 +66,10 @@ public final class RouterMetrics {
@Metric("Total number of successful Retrieved multiple apps reports and "
+ "latency(ms)")
private MutableRate totalSucceededMultipleAppsRetrieved;
@Metric("Total number of successful Retrieved " +
"appAttempt reports and latency(ms)")
private MutableRate totalSucceededAppAttemptsRetrieved;
/**
* Provide quantile counters for all latencies.
@ -73,6 +79,7 @@ public final class RouterMetrics {
private MutableQuantiles killApplicationLatency;
private MutableQuantiles getApplicationReportLatency;
private MutableQuantiles getApplicationsReportLatency;
private MutableQuantiles getApplicationAttemptReportLatency;
private static volatile RouterMetrics INSTANCE = null;
private static MetricsRegistry registry;
@ -92,6 +99,10 @@ public final class RouterMetrics {
getApplicationsReportLatency =
registry.newQuantiles("getApplicationsReportLatency",
"latency of get applications report", "ops", "latency", 10);
getApplicationAttemptReportLatency =
registry.newQuantiles("getApplicationAttemptReportLatency",
"latency of get applicationattempt " +
"report", "ops", "latency", 10);
}
public static RouterMetrics getMetrics() {
@ -133,6 +144,11 @@ public final class RouterMetrics {
return totalSucceededAppsRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededAppAttemptsRetrieved() {
return totalSucceededAppAttemptsRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededMultipleAppsRetrieved() {
return totalSucceededMultipleAppsRetrieved.lastStat().numSamples();
@ -153,6 +169,11 @@ public final class RouterMetrics {
return totalSucceededAppsKilled.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededGetAppAttemptReport() {
return totalSucceededAppAttemptsRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededGetAppReport() {
return totalSucceededAppsRetrieved.lastStat().mean();
@ -183,6 +204,11 @@ public final class RouterMetrics {
return numAppsFailedRetrieved.value();
}
@VisibleForTesting
public int getAppAttemptsFailedRetrieved() {
return numAppsFailedRetrieved.value();
}
@VisibleForTesting
public int getMultipleAppsFailedRetrieved() {
return numMultipleAppsFailedRetrieved.value();
@ -213,6 +239,11 @@ public final class RouterMetrics {
getApplicationsReportLatency.add(duration);
}
public void succeededAppAttemptsRetrieved(long duration) {
totalSucceededAppAttemptsRetrieved.add(duration);
getApplicationAttemptReportLatency.add(duration);
}
public void incrAppsFailedCreated() {
numAppsFailedCreated.incr();
}
@ -233,4 +264,8 @@ public final class RouterMetrics {
numMultipleAppsFailedRetrieved.incr();
}
public void incrAppAttemptsFailedRetrieved() {
numAppAttemptsFailedRetrieved.incr();
}
}

View File

@ -749,11 +749,79 @@ public class FederationClientInterceptor
throw new NotImplementedException("Code is not implemented");
}
/**
* The YARN Router will forward to the respective YARN RM in which the AM is
* running.
*
* Possible failure:
*
* Client: identical behavior as {@code ClientRMService}.
*
* Router: the Client will timeout and resubmit the request.
*
* ResourceManager: the Router will timeout and the call will fail.
*
* State Store: the Router will timeout and it will retry depending on the
* FederationFacade settings - if the failure happened before the select
* operation.
*/
@Override
public GetApplicationAttemptReportResponse getApplicationAttemptReport(
GetApplicationAttemptReportRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
long startTime = clock.getTime();
if (request == null || request.getApplicationAttemptId() == null
|| request.getApplicationAttemptId().getApplicationId() == null) {
routerMetrics.incrAppAttemptsFailedRetrieved();
RouterServerUtil.logAndThrowException(
"Missing getApplicationAttemptReport " +
"request or applicationId " +
"or applicationAttemptId information.",
null);
}
SubClusterId subClusterId = null;
try {
subClusterId = federationFacade
.getApplicationHomeSubCluster(
request.getApplicationAttemptId().getApplicationId());
} catch (YarnException e) {
routerMetrics.incrAppAttemptsFailedRetrieved();
RouterServerUtil
.logAndThrowException("ApplicationAttempt " +
request.getApplicationAttemptId() +
"belongs to Application " +
request.getApplicationAttemptId().getApplicationId() +
" does not exist in FederationStateStore", e);
}
ApplicationClientProtocol clientRMProxy =
getClientRMProxyForSubCluster(subClusterId);
GetApplicationAttemptReportResponse response = null;
try {
response = clientRMProxy.getApplicationAttemptReport(request);
} catch (Exception e) {
routerMetrics.incrAppAttemptsFailedRetrieved();
LOG.error("Unable to get the applicationAttempt report for "
+ request.getApplicationAttemptId() + "to SubCluster "
+ subClusterId.getId(), e);
throw e;
}
if (response == null) {
LOG.error("No response when attempting to retrieve the report of "
+ "the applicationAttempt "
+ request.getApplicationAttemptId() + " to SubCluster "
+ subClusterId.getId());
}
long stopTime = clock.getTime();
routerMetrics.succeededAppAttemptsRetrieved(stopTime - startTime);
return response;
}
@Override

View File

@ -47,11 +47,15 @@ public class TestRouterMetrics {
Assert.assertEquals(0, metrics.getNumSucceededAppsSubmitted());
Assert.assertEquals(0, metrics.getNumSucceededAppsKilled());
Assert.assertEquals(0, metrics.getNumSucceededAppsRetrieved());
Assert.assertEquals(0,
metrics.getNumSucceededAppAttemptsRetrieved());
Assert.assertEquals(0, metrics.getAppsFailedCreated());
Assert.assertEquals(0, metrics.getAppsFailedSubmitted());
Assert.assertEquals(0, metrics.getAppsFailedKilled());
Assert.assertEquals(0, metrics.getAppsFailedRetrieved());
Assert.assertEquals(0,
metrics.getAppAttemptsFailedRetrieved());
LOG.info("Test: aggregate metrics are updated correctly");
}
@ -196,6 +200,46 @@ public class TestRouterMetrics {
Assert.assertEquals(totalBadbefore + 1, metrics.getAppsFailedRetrieved());
}
/**
* This test validates the correctness of the metric:
* Retrieved AppAttempt Report
* successfully.
*/
@Test
public void testSucceededAppAttemptReport() {
long totalGoodBefore = metrics.getNumSucceededAppAttemptsRetrieved();
goodSubCluster.getApplicationAttemptReport(100);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededAppAttemptsRetrieved());
Assert.assertEquals(100,
metrics.getLatencySucceededGetAppAttemptReport(), 0);
goodSubCluster.getApplicationAttemptReport(200);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededAppAttemptsRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededGetAppAttemptReport(), 0);
}
/**
* This test validates the correctness of the metric:
* Failed to retrieve AppAttempt Report.
*/
@Test
public void testAppAttemptReportFailed() {
long totalBadbefore = metrics.getAppAttemptsFailedRetrieved();
badSubCluster.getApplicationAttemptReport();
Assert.assertEquals(totalBadbefore + 1,
metrics.getAppAttemptsFailedRetrieved());
}
/**
* This test validates the correctness of the metric: Retrieved Multiple Apps
* successfully.
@ -257,6 +301,11 @@ public class TestRouterMetrics {
metrics.incrAppsFailedRetrieved();
}
public void getApplicationAttemptReport() {
LOG.info("Mocked: failed getApplicationAttemptReport call");
metrics.incrAppsFailedRetrieved();
}
public void getApplicationsReport() {
LOG.info("Mocked: failed getApplicationsReport call");
metrics.incrMultipleAppsFailedRetrieved();
@ -289,6 +338,13 @@ public class TestRouterMetrics {
metrics.succeededAppsRetrieved(duration);
}
public void getApplicationAttemptReport(long duration) {
LOG.info("Mocked: successful " +
"getApplicationAttemptReport call with duration {}",
duration);
metrics.succeededAppAttemptsRetrieved(duration);
}
public void getApplicationsReport(long duration) {
LOG.info("Mocked: successful getApplicationsReport call with duration {}",
duration);

View File

@ -26,9 +26,12 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
@ -38,6 +41,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Priority;
@ -177,7 +181,7 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
ApplicationId appId) {
ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class);
ApplicationSubmissionContext context = ApplicationSubmissionContext
.newInstance(appId, MockApps.newAppName(), "q1",
.newInstance(appId, MockApps.newAppName(), "default",
Priority.newInstance(0), amContainerSpec, false, false, -1,
Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB),
@ -410,6 +414,102 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
}
}
/**
* This test validates the correctness of
* GetApplicationAttemptReport in case the
* application exists in the cluster.
*/
@Test
public void testGetApplicationAttemptReport()
throws YarnException, IOException, InterruptedException {
LOG.info("Test FederationClientInterceptor: " +
"Get ApplicationAttempt Report");
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
// Submit the application we want the applicationAttempt report later
SubmitApplicationResponse response = interceptor.submitApplication(request);
Assert.assertNotNull(response);
Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
GetApplicationAttemptReportRequest requestGet =
GetApplicationAttemptReportRequest.newInstance(appAttemptId);
GetApplicationAttemptReportResponse responseGet =
interceptor.getApplicationAttemptReport(requestGet);
Assert.assertNotNull(responseGet);
}
/**
* This test validates the correctness of
* GetApplicationAttemptReport in case the
* application does not exist in StateStore.
*/
@Test
public void testGetApplicationAttemptNotExists()
throws Exception {
LOG.info(
"Test ApplicationClientProtocol: " +
"Get ApplicationAttempt Report - Not Exists");
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationAttemptId appAttemptID =
ApplicationAttemptId.newInstance(appId, 1);
GetApplicationAttemptReportRequest requestGet =
GetApplicationAttemptReportRequest.newInstance(appAttemptID);
LambdaTestUtils.intercept(YarnException.class, "ApplicationAttempt " +
appAttemptID + "belongs to Application " +
appId + " does not exist in FederationStateStore",
() -> interceptor.getApplicationAttemptReport(requestGet));
}
/**
* This test validates
* the correctness of GetApplicationAttemptReport in case of
* empty request.
*/
@Test
public void testGetApplicationAttemptEmptyRequest()
throws Exception {
LOG.info("Test FederationClientInterceptor: " +
"Get ApplicationAttempt Report - Empty");
LambdaTestUtils.intercept(YarnException.class,
"Missing getApplicationAttemptReport " +
"request or applicationId " +
"or applicationAttemptId information.",
() -> interceptor.getApplicationAttemptReport(null));
LambdaTestUtils.intercept(YarnException.class,
"Missing getApplicationAttemptReport " +
"request or applicationId " +
"or applicationAttemptId information.",
() -> interceptor
.getApplicationAttemptReport(
GetApplicationAttemptReportRequest
.newInstance(null)));
LambdaTestUtils.intercept(YarnException.class,
"Missing getApplicationAttemptReport " +
"request or applicationId " +
"or applicationAttemptId information.",
() -> interceptor
.getApplicationAttemptReport(
GetApplicationAttemptReportRequest.newInstance(
ApplicationAttemptId
.newInstance(null, 1)
)));
}
@Test
public void testGetClusterMetricsRequest() throws YarnException, IOException {
LOG.info("Test FederationClientInterceptor : Get Cluster Metrics request");