diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java index 884e06e4ba0..24fdbb9062f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java @@ -51,6 +51,8 @@ public final class RouterMetrics { private MutableGaugeInt numAppsFailedRetrieved; @Metric("# of multiple applications reports failed to be retrieved") private MutableGaugeInt numMultipleAppsFailedRetrieved; + @Metric("# of applicationAttempt reports failed to be retrieved") + private MutableGaugeInt numAppAttemptsFailedRetrieved; // Aggregate metrics are shared, and don't have to be looked up per call @Metric("Total number of successful Submitted apps and latency(ms)") @@ -64,6 +66,10 @@ public final class RouterMetrics { @Metric("Total number of successful Retrieved multiple apps reports and " + "latency(ms)") private MutableRate totalSucceededMultipleAppsRetrieved; + @Metric("Total number of successful Retrieved " + + "appAttempt reports and latency(ms)") + private MutableRate totalSucceededAppAttemptsRetrieved; + /** * Provide quantile counters for all latencies. @@ -73,6 +79,7 @@ public final class RouterMetrics { private MutableQuantiles killApplicationLatency; private MutableQuantiles getApplicationReportLatency; private MutableQuantiles getApplicationsReportLatency; + private MutableQuantiles getApplicationAttemptReportLatency; private static volatile RouterMetrics INSTANCE = null; private static MetricsRegistry registry; @@ -92,6 +99,10 @@ public final class RouterMetrics { getApplicationsReportLatency = registry.newQuantiles("getApplicationsReportLatency", "latency of get applications report", "ops", "latency", 10); + getApplicationAttemptReportLatency = + registry.newQuantiles("getApplicationAttemptReportLatency", + "latency of get applicationattempt " + + "report", "ops", "latency", 10); } public static RouterMetrics getMetrics() { @@ -133,6 +144,11 @@ public final class RouterMetrics { return totalSucceededAppsRetrieved.lastStat().numSamples(); } + @VisibleForTesting + public long getNumSucceededAppAttemptsRetrieved() { + return totalSucceededAppAttemptsRetrieved.lastStat().numSamples(); + } + @VisibleForTesting public long getNumSucceededMultipleAppsRetrieved() { return totalSucceededMultipleAppsRetrieved.lastStat().numSamples(); @@ -153,6 +169,11 @@ public final class RouterMetrics { return totalSucceededAppsKilled.lastStat().mean(); } + @VisibleForTesting + public double getLatencySucceededGetAppAttemptReport() { + return totalSucceededAppAttemptsRetrieved.lastStat().mean(); + } + @VisibleForTesting public double getLatencySucceededGetAppReport() { return totalSucceededAppsRetrieved.lastStat().mean(); @@ -183,6 +204,11 @@ public final class RouterMetrics { return numAppsFailedRetrieved.value(); } + @VisibleForTesting + public int getAppAttemptsFailedRetrieved() { + return numAppsFailedRetrieved.value(); + } + @VisibleForTesting public int getMultipleAppsFailedRetrieved() { return numMultipleAppsFailedRetrieved.value(); @@ -213,6 +239,11 @@ public final class RouterMetrics { getApplicationsReportLatency.add(duration); } + public void succeededAppAttemptsRetrieved(long duration) { + totalSucceededAppAttemptsRetrieved.add(duration); + getApplicationAttemptReportLatency.add(duration); + } + public void incrAppsFailedCreated() { numAppsFailedCreated.incr(); } @@ -233,4 +264,8 @@ public final class RouterMetrics { numMultipleAppsFailedRetrieved.incr(); } -} \ No newline at end of file + public void incrAppAttemptsFailedRetrieved() { + numAppAttemptsFailedRetrieved.incr(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index a721fe0d8ec..7e8e7af3c7a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -749,11 +749,79 @@ public class FederationClientInterceptor throw new NotImplementedException("Code is not implemented"); } + /** + * The YARN Router will forward to the respective YARN RM in which the AM is + * running. + * + * Possible failure: + * + * Client: identical behavior as {@code ClientRMService}. + * + * Router: the Client will timeout and resubmit the request. + * + * ResourceManager: the Router will timeout and the call will fail. + * + * State Store: the Router will timeout and it will retry depending on the + * FederationFacade settings - if the failure happened before the select + * operation. + */ @Override public GetApplicationAttemptReportResponse getApplicationAttemptReport( GetApplicationAttemptReportRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + long startTime = clock.getTime(); + + if (request == null || request.getApplicationAttemptId() == null + || request.getApplicationAttemptId().getApplicationId() == null) { + routerMetrics.incrAppAttemptsFailedRetrieved(); + RouterServerUtil.logAndThrowException( + "Missing getApplicationAttemptReport " + + "request or applicationId " + + "or applicationAttemptId information.", + null); + } + + SubClusterId subClusterId = null; + + try { + subClusterId = federationFacade + .getApplicationHomeSubCluster( + request.getApplicationAttemptId().getApplicationId()); + } catch (YarnException e) { + routerMetrics.incrAppAttemptsFailedRetrieved(); + RouterServerUtil + .logAndThrowException("ApplicationAttempt " + + request.getApplicationAttemptId() + + "belongs to Application " + + request.getApplicationAttemptId().getApplicationId() + + " does not exist in FederationStateStore", e); + } + + ApplicationClientProtocol clientRMProxy = + getClientRMProxyForSubCluster(subClusterId); + + GetApplicationAttemptReportResponse response = null; + try { + response = clientRMProxy.getApplicationAttemptReport(request); + } catch (Exception e) { + routerMetrics.incrAppAttemptsFailedRetrieved(); + LOG.error("Unable to get the applicationAttempt report for " + + request.getApplicationAttemptId() + "to SubCluster " + + subClusterId.getId(), e); + throw e; + } + + if (response == null) { + LOG.error("No response when attempting to retrieve the report of " + + "the applicationAttempt " + + request.getApplicationAttemptId() + " to SubCluster " + + subClusterId.getId()); + } + + long stopTime = clock.getTime(); + routerMetrics.succeededAppAttemptsRetrieved(stopTime - startTime); + return response; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java index 4c18ace8611..1456a42e6bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java @@ -47,11 +47,15 @@ public class TestRouterMetrics { Assert.assertEquals(0, metrics.getNumSucceededAppsSubmitted()); Assert.assertEquals(0, metrics.getNumSucceededAppsKilled()); Assert.assertEquals(0, metrics.getNumSucceededAppsRetrieved()); + Assert.assertEquals(0, + metrics.getNumSucceededAppAttemptsRetrieved()); Assert.assertEquals(0, metrics.getAppsFailedCreated()); Assert.assertEquals(0, metrics.getAppsFailedSubmitted()); Assert.assertEquals(0, metrics.getAppsFailedKilled()); Assert.assertEquals(0, metrics.getAppsFailedRetrieved()); + Assert.assertEquals(0, + metrics.getAppAttemptsFailedRetrieved()); LOG.info("Test: aggregate metrics are updated correctly"); } @@ -196,6 +200,46 @@ public class TestRouterMetrics { Assert.assertEquals(totalBadbefore + 1, metrics.getAppsFailedRetrieved()); } + /** + * This test validates the correctness of the metric: + * Retrieved AppAttempt Report + * successfully. + */ + @Test + public void testSucceededAppAttemptReport() { + + long totalGoodBefore = metrics.getNumSucceededAppAttemptsRetrieved(); + + goodSubCluster.getApplicationAttemptReport(100); + + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededAppAttemptsRetrieved()); + Assert.assertEquals(100, + metrics.getLatencySucceededGetAppAttemptReport(), 0); + + goodSubCluster.getApplicationAttemptReport(200); + + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededAppAttemptsRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededGetAppAttemptReport(), 0); + } + + /** + * This test validates the correctness of the metric: + * Failed to retrieve AppAttempt Report. + */ + @Test + public void testAppAttemptReportFailed() { + + long totalBadbefore = metrics.getAppAttemptsFailedRetrieved(); + + badSubCluster.getApplicationAttemptReport(); + + Assert.assertEquals(totalBadbefore + 1, + metrics.getAppAttemptsFailedRetrieved()); + } + /** * This test validates the correctness of the metric: Retrieved Multiple Apps * successfully. @@ -257,6 +301,11 @@ public class TestRouterMetrics { metrics.incrAppsFailedRetrieved(); } + public void getApplicationAttemptReport() { + LOG.info("Mocked: failed getApplicationAttemptReport call"); + metrics.incrAppsFailedRetrieved(); + } + public void getApplicationsReport() { LOG.info("Mocked: failed getApplicationsReport call"); metrics.incrMultipleAppsFailedRetrieved(); @@ -289,6 +338,13 @@ public class TestRouterMetrics { metrics.succeededAppsRetrieved(duration); } + public void getApplicationAttemptReport(long duration) { + LOG.info("Mocked: successful " + + "getApplicationAttemptReport call with duration {}", + duration); + metrics.succeededAppAttemptsRetrieved(duration); + } + public void getApplicationsReport(long duration) { LOG.info("Mocked: successful getApplicationsReport call with duration {}", duration); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java index ee6e7b8eaf6..125dfcfbeee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java @@ -26,9 +26,12 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; @@ -38,6 +41,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.Priority; @@ -177,7 +181,7 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest { ApplicationId appId) { ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class); ApplicationSubmissionContext context = ApplicationSubmissionContext - .newInstance(appId, MockApps.newAppName(), "q1", + .newInstance(appId, MockApps.newAppName(), "default", Priority.newInstance(0), amContainerSpec, false, false, -1, Resources.createResource( YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB), @@ -410,6 +414,102 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest { } } + /** + * This test validates the correctness of + * GetApplicationAttemptReport in case the + * application exists in the cluster. + */ + @Test + public void testGetApplicationAttemptReport() + throws YarnException, IOException, InterruptedException { + LOG.info("Test FederationClientInterceptor: " + + "Get ApplicationAttempt Report"); + + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); + + // Submit the application we want the applicationAttempt report later + SubmitApplicationResponse response = interceptor.submitApplication(request); + + Assert.assertNotNull(response); + Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); + + GetApplicationAttemptReportRequest requestGet = + GetApplicationAttemptReportRequest.newInstance(appAttemptId); + + GetApplicationAttemptReportResponse responseGet = + interceptor.getApplicationAttemptReport(requestGet); + + Assert.assertNotNull(responseGet); + } + + /** + * This test validates the correctness of + * GetApplicationAttemptReport in case the + * application does not exist in StateStore. + */ + @Test + public void testGetApplicationAttemptNotExists() + throws Exception { + LOG.info( + "Test ApplicationClientProtocol: " + + "Get ApplicationAttempt Report - Not Exists"); + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationAttemptId appAttemptID = + ApplicationAttemptId.newInstance(appId, 1); + GetApplicationAttemptReportRequest requestGet = + GetApplicationAttemptReportRequest.newInstance(appAttemptID); + + LambdaTestUtils.intercept(YarnException.class, "ApplicationAttempt " + + appAttemptID + "belongs to Application " + + appId + " does not exist in FederationStateStore", + () -> interceptor.getApplicationAttemptReport(requestGet)); + } + + /** + * This test validates + * the correctness of GetApplicationAttemptReport in case of + * empty request. + */ + @Test + public void testGetApplicationAttemptEmptyRequest() + throws Exception { + LOG.info("Test FederationClientInterceptor: " + + "Get ApplicationAttempt Report - Empty"); + + LambdaTestUtils.intercept(YarnException.class, + "Missing getApplicationAttemptReport " + + "request or applicationId " + + "or applicationAttemptId information.", + () -> interceptor.getApplicationAttemptReport(null)); + + LambdaTestUtils.intercept(YarnException.class, + "Missing getApplicationAttemptReport " + + "request or applicationId " + + "or applicationAttemptId information.", + () -> interceptor + .getApplicationAttemptReport( + GetApplicationAttemptReportRequest + .newInstance(null))); + + LambdaTestUtils.intercept(YarnException.class, + "Missing getApplicationAttemptReport " + + "request or applicationId " + + "or applicationAttemptId information.", + () -> interceptor + .getApplicationAttemptReport( + GetApplicationAttemptReportRequest.newInstance( + ApplicationAttemptId + .newInstance(null, 1) + ))); + } + + @Test public void testGetClusterMetricsRequest() throws YarnException, IOException { LOG.info("Test FederationClientInterceptor : Get Cluster Metrics request");