diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java index 1399b2f4b22..b02b3e155fa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java @@ -75,6 +75,12 @@ public final class RouterMetrics { private MutableGaugeInt numListReservationsFailedRetrieved; @Metric("# of getResourceTypeInfo failed to be retrieved") private MutableGaugeInt numGetResourceTypeInfo; + @Metric("# of failApplicationAttempt failed to be retrieved") + private MutableGaugeInt numFailAppAttemptFailedRetrieved; + @Metric("# of updateApplicationPriority failed to be retrieved") + private MutableGaugeInt numUpdateAppPriorityFailedRetrieved; + @Metric("# of updateApplicationPriority failed to be retrieved") + private MutableGaugeInt numUpdateAppTimeoutsFailedRetrieved; // Aggregate metrics are shared, and don't have to be looked up per call @Metric("Total number of successful Submitted apps and latency(ms)") @@ -114,6 +120,12 @@ public final class RouterMetrics { private MutableRate totalSucceededListReservationsRetrieved; @Metric("Total number of successful Retrieved getResourceTypeInfo and latency(ms)") private MutableRate totalSucceededGetResourceTypeInfoRetrieved; + @Metric("Total number of successful Retrieved failApplicationAttempt and latency(ms)") + private MutableRate totalSucceededFailAppAttemptRetrieved; + @Metric("Total number of successful Retrieved updateApplicationPriority and latency(ms)") + private MutableRate totalSucceededUpdateAppPriorityRetrieved; + @Metric("Total number of successful Retrieved updateApplicationTimeouts and latency(ms)") + private MutableRate totalSucceededUpdateAppTimeoutsRetrieved; /** * Provide quantile counters for all latencies. @@ -135,8 +147,11 @@ public final class RouterMetrics { private MutableQuantiles getContainerLatency; private MutableQuantiles listReservationsLatency; private MutableQuantiles listResourceTypeInfoLatency; + private MutableQuantiles failAppAttemptLatency; + private MutableQuantiles updateAppPriorityLatency; + private MutableQuantiles updateAppTimeoutsLatency; - private static volatile RouterMetrics INSTANCE = null; + private static volatile RouterMetrics instance = null; private static MetricsRegistry registry; private RouterMetrics() { @@ -201,25 +216,37 @@ public final class RouterMetrics { listResourceTypeInfoLatency = registry.newQuantiles("getResourceTypeInfoLatency", "latency of get resource type info", "ops", "latency", 10); + + failAppAttemptLatency = + registry.newQuantiles("failApplicationAttemptLatency", + "latency of fail application attempt", "ops", "latency", 10); + + updateAppPriorityLatency = + registry.newQuantiles("updateApplicationPriorityLatency", + "latency of update application priority", "ops", "latency", 10); + + updateAppTimeoutsLatency = + registry.newQuantiles("updateApplicationTimeoutsLatency", + "latency of update application timeouts", "ops", "latency", 10); } public static RouterMetrics getMetrics() { if (!isInitialized.get()) { synchronized (RouterMetrics.class) { - if (INSTANCE == null) { - INSTANCE = DefaultMetricsSystem.instance().register("RouterMetrics", + if (instance == null) { + instance = DefaultMetricsSystem.instance().register("RouterMetrics", "Metrics for the Yarn Router", new RouterMetrics()); isInitialized.set(true); } } } - return INSTANCE; + return instance; } @VisibleForTesting synchronized static void destroy() { isInitialized.set(false); - INSTANCE = null; + instance = null; } @VisibleForTesting @@ -307,6 +334,21 @@ public final class RouterMetrics { return totalSucceededGetResourceTypeInfoRetrieved.lastStat().numSamples(); } + @VisibleForTesting + public long getNumSucceededFailAppAttemptRetrieved() { + return totalSucceededFailAppAttemptRetrieved.lastStat().numSamples(); + } + + @VisibleForTesting + public long getNumSucceededUpdateAppPriorityRetrieved() { + return totalSucceededUpdateAppPriorityRetrieved.lastStat().numSamples(); + } + + @VisibleForTesting + public long getNumSucceededUpdateAppTimeoutsRetrieved() { + return totalSucceededUpdateAppTimeoutsRetrieved.lastStat().numSamples(); + } + @VisibleForTesting public double getLatencySucceededAppsCreated() { return totalSucceededAppsCreated.lastStat().mean(); @@ -392,6 +434,21 @@ public final class RouterMetrics { return totalSucceededGetResourceTypeInfoRetrieved.lastStat().mean(); } + @VisibleForTesting + public double getLatencySucceededFailAppAttemptRetrieved() { + return totalSucceededFailAppAttemptRetrieved.lastStat().mean(); + } + + @VisibleForTesting + public double getLatencySucceededUpdateAppPriorityRetrieved() { + return totalSucceededUpdateAppPriorityRetrieved.lastStat().mean(); + } + + @VisibleForTesting + public double getLatencySucceededUpdateAppTimeoutsRetrieved() { + return totalSucceededUpdateAppTimeoutsRetrieved.lastStat().mean(); + } + @VisibleForTesting public int getAppsFailedCreated() { return numAppsFailedCreated.value(); @@ -477,6 +534,21 @@ public final class RouterMetrics { return numGetResourceTypeInfo.value(); } + @VisibleForTesting + public int getFailApplicationAttemptFailedRetrieved() { + return numFailAppAttemptFailedRetrieved.value(); + } + + @VisibleForTesting + public int getUpdateApplicationPriorityFailedRetrieved() { + return numUpdateAppPriorityFailedRetrieved.value(); + } + + @VisibleForTesting + public int getUpdateApplicationTimeoutsFailedRetrieved() { + return numUpdateAppTimeoutsFailedRetrieved.value(); + } + public void succeededAppsCreated(long duration) { totalSucceededAppsCreated.add(duration); getNewApplicationLatency.add(duration); @@ -562,6 +634,21 @@ public final class RouterMetrics { listResourceTypeInfoLatency.add(duration); } + public void succeededFailAppAttemptRetrieved(long duration) { + totalSucceededFailAppAttemptRetrieved.add(duration); + failAppAttemptLatency.add(duration); + } + + public void succeededUpdateAppPriorityRetrieved(long duration) { + totalSucceededUpdateAppPriorityRetrieved.add(duration); + updateAppPriorityLatency.add(duration); + } + + public void succeededUpdateAppTimeoutsRetrieved(long duration) { + totalSucceededUpdateAppTimeoutsRetrieved.add(duration); + updateAppTimeoutsLatency.add(duration); + } + public void incrAppsFailedCreated() { numAppsFailedCreated.incr(); } @@ -629,4 +716,16 @@ public final class RouterMetrics { public void incrResourceTypeInfoFailedRetrieved() { numGetResourceTypeInfo.incr(); } + + public void incrFailAppAttemptFailedRetrieved() { + numFailAppAttemptFailedRetrieved.incr(); + } + + public void incrUpdateAppPriorityFailedRetrieved() { + numUpdateAppPriorityFailedRetrieved.incr(); + } + + public void incrUpdateApplicationTimeoutsRetrieved() { + numUpdateAppTimeoutsFailedRetrieved.incr(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index f92e3566ca2..fec62d4b080 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -1213,14 +1213,92 @@ public class FederationClientInterceptor @Override public FailApplicationAttemptResponse failApplicationAttempt( FailApplicationAttemptRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + if (request == null || request.getApplicationAttemptId() == null + || request.getApplicationAttemptId().getApplicationId() == null) { + routerMetrics.incrFailAppAttemptFailedRetrieved(); + RouterServerUtil.logAndThrowException( + "Missing failApplicationAttempt request or applicationId " + + "or applicationAttemptId information.", null); + } + long startTime = clock.getTime(); + SubClusterId subClusterId = null; + ApplicationId applicationId = request.getApplicationAttemptId().getApplicationId(); + + try { + subClusterId = getApplicationHomeSubCluster(applicationId); + } catch (YarnException e) { + routerMetrics.incrFailAppAttemptFailedRetrieved(); + RouterServerUtil.logAndThrowException("ApplicationAttempt " + + request.getApplicationAttemptId() + " belongs to Application " + + request.getApplicationAttemptId().getApplicationId() + + " does not exist in FederationStateStore.", e); + } + + ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); + FailApplicationAttemptResponse response = null; + try { + response = clientRMProxy.failApplicationAttempt(request); + } catch (Exception e) { + routerMetrics.incrFailAppAttemptFailedRetrieved(); + RouterServerUtil.logAndThrowException("Unable to get the applicationAttempt report for " + + request.getApplicationAttemptId() + " to SubCluster " + subClusterId.getId(), e); + } + + if (response == null) { + LOG.error("No response when attempting to retrieve the report of " + + "the applicationAttempt {} to SubCluster {}.", + request.getApplicationAttemptId(), subClusterId.getId()); + } + + long stopTime = clock.getTime(); + routerMetrics.succeededFailAppAttemptRetrieved(stopTime - startTime); + return response; } @Override public UpdateApplicationPriorityResponse updateApplicationPriority( UpdateApplicationPriorityRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + if (request == null || request.getApplicationId() == null + || request.getApplicationPriority() == null) { + routerMetrics.incrUpdateAppPriorityFailedRetrieved(); + RouterServerUtil.logAndThrowException( + "Missing updateApplicationPriority request or applicationId " + + "or applicationPriority information.", null); + } + + long startTime = clock.getTime(); + SubClusterId subClusterId = null; + ApplicationId applicationId = request.getApplicationId(); + + try { + subClusterId = getApplicationHomeSubCluster(applicationId); + } catch (YarnException e) { + routerMetrics.incrUpdateAppPriorityFailedRetrieved(); + RouterServerUtil.logAndThrowException("Application " + + request.getApplicationId() + + " does not exist in FederationStateStore.", e); + } + + ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); + UpdateApplicationPriorityResponse response = null; + try { + response = clientRMProxy.updateApplicationPriority(request); + } catch (Exception e) { + routerMetrics.incrFailAppAttemptFailedRetrieved(); + RouterServerUtil.logAndThrowException("Unable to update application priority for " + + request.getApplicationId() + " to SubCluster " + subClusterId.getId(), e); + } + + if (response == null) { + LOG.error("No response when update application priority of " + + "the applicationId {} to SubCluster {}.", + applicationId, subClusterId.getId()); + } + + long stopTime = clock.getTime(); + routerMetrics.succeededUpdateAppPriorityRetrieved(stopTime - startTime); + return response; } @Override @@ -1233,7 +1311,45 @@ public class FederationClientInterceptor public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( UpdateApplicationTimeoutsRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + if (request == null || request.getApplicationId() == null + || request.getApplicationTimeouts() == null) { + routerMetrics.incrUpdateApplicationTimeoutsRetrieved(); + RouterServerUtil.logAndThrowException( + "Missing updateApplicationTimeouts request or applicationId " + + "or applicationTimeouts information.", null); + } + + long startTime = clock.getTime(); + SubClusterId subClusterId = null; + ApplicationId applicationId = request.getApplicationId(); + try { + subClusterId = getApplicationHomeSubCluster(applicationId); + } catch (YarnException e) { + routerMetrics.incrFailAppAttemptFailedRetrieved(); + RouterServerUtil.logAndThrowException("Application " + + request.getApplicationId() + + " does not exist in FederationStateStore.", e); + } + + ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); + UpdateApplicationTimeoutsResponse response = null; + try { + response = clientRMProxy.updateApplicationTimeouts(request); + } catch (Exception e) { + routerMetrics.incrFailAppAttemptFailedRetrieved(); + RouterServerUtil.logAndThrowException("Unable to update application timeout for " + + request.getApplicationId() + " to SubCluster " + subClusterId.getId(), e); + } + + if (response == null) { + LOG.error("No response when update application timeout of " + + "the applicationId {} to SubCluster {}.", + applicationId, subClusterId.getId()); + } + + long stopTime = clock.getTime(); + routerMetrics.succeededUpdateAppTimeoutsRetrieved(stopTime - startTime); + return response; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java index a4df82f9dcb..4b1049e8b64 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java @@ -398,6 +398,21 @@ public class TestRouterMetrics { LOG.info("Mocked: failed getResourceTypeInfo call"); metrics.incrResourceTypeInfoFailedRetrieved(); } + + public void getFailApplicationAttempt() { + LOG.info("Mocked: failed failApplicationAttempt call"); + metrics.incrFailAppAttemptFailedRetrieved(); + } + + public void getUpdateApplicationPriority() { + LOG.info("Mocked: failed updateApplicationPriority call"); + metrics.incrUpdateAppPriorityFailedRetrieved(); + } + + public void getUpdateApplicationTimeouts() { + LOG.info("Mocked: failed updateApplicationTimeouts call"); + metrics.incrUpdateApplicationTimeoutsRetrieved(); + } } // Records successes for all calls @@ -493,6 +508,21 @@ public class TestRouterMetrics { LOG.info("Mocked: successful getResourceTypeInfo call with duration {}", duration); metrics.succeededGetResourceTypeInfoRetrieved(duration); } + + public void getFailApplicationAttempt(long duration) { + LOG.info("Mocked: successful failApplicationAttempt call with duration {}", duration); + metrics.succeededFailAppAttemptRetrieved(duration); + } + + public void getUpdateApplicationPriority(long duration) { + LOG.info("Mocked: successful updateApplicationPriority call with duration {}", duration); + metrics.succeededUpdateAppPriorityRetrieved(duration); + } + + public void getUpdateApplicationTimeouts(long duration) { + LOG.info("Mocked: successful updateApplicationTimeouts call with duration {}", duration); + metrics.succeededUpdateAppTimeoutsRetrieved(duration); + } } @Test @@ -708,4 +738,72 @@ public class TestRouterMetrics { Assert.assertEquals(totalBadBefore + 1, metrics.getGetResourceTypeInfoRetrieved()); } + @Test + public void testSucceededFailApplicationAttempt() { + long totalGoodBefore = metrics.getNumSucceededFailAppAttemptRetrieved(); + goodSubCluster.getFailApplicationAttempt(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededFailAppAttemptRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededFailAppAttemptRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.getFailApplicationAttempt(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededFailAppAttemptRetrieved()); + Assert.assertEquals(225, + metrics.getLatencySucceededFailAppAttemptRetrieved(), ASSERT_DOUBLE_DELTA); + } + + @Test + public void testFailApplicationAttemptFailed() { + long totalBadBefore = metrics.getFailApplicationAttemptFailedRetrieved(); + badSubCluster.getFailApplicationAttempt(); + Assert.assertEquals(totalBadBefore + 1, metrics.getFailApplicationAttemptFailedRetrieved()); + } + + @Test + public void testSucceededUpdateApplicationPriority() { + long totalGoodBefore = metrics.getNumSucceededUpdateAppPriorityRetrieved(); + goodSubCluster.getUpdateApplicationPriority(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededUpdateAppPriorityRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededUpdateAppPriorityRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.getUpdateApplicationPriority(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededUpdateAppPriorityRetrieved()); + Assert.assertEquals(225, + metrics.getLatencySucceededUpdateAppPriorityRetrieved(), ASSERT_DOUBLE_DELTA); + } + + @Test + public void testUpdateApplicationPriorityFailed() { + long totalBadBefore = metrics.getUpdateApplicationPriorityFailedRetrieved(); + badSubCluster.getUpdateApplicationPriority(); + Assert.assertEquals(totalBadBefore + 1, + metrics.getUpdateApplicationPriorityFailedRetrieved()); + } + + @Test + public void testSucceededUpdateAppTimeoutsRetrieved() { + long totalGoodBefore = metrics.getNumSucceededUpdateAppTimeoutsRetrieved(); + goodSubCluster.getUpdateApplicationTimeouts(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededUpdateAppTimeoutsRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededUpdateAppTimeoutsRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.getUpdateApplicationTimeouts(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededUpdateAppTimeoutsRetrieved()); + Assert.assertEquals(225, + metrics.getLatencySucceededUpdateAppTimeoutsRetrieved(), ASSERT_DOUBLE_DELTA); + } + + @Test + public void testUpdateAppTimeoutsFailed() { + long totalBadBefore = metrics.getUpdateApplicationTimeoutsFailedRetrieved(); + badSubCluster.getUpdateApplicationTimeouts(); + Assert.assertEquals(totalBadBefore + 1, + metrics.getUpdateApplicationTimeoutsFailedRetrieved()); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java index 8fa52e8f92b..9ead9fbe721 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java @@ -26,6 +26,7 @@ import java.util.Collections; import java.util.EnumSet; import java.util.List; import java.util.Map; +import java.util.HashMap; import java.util.Set; import java.util.stream.Collectors; @@ -65,6 +66,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -75,6 +82,7 @@ import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager; @@ -82,7 +90,12 @@ import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationState import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.util.Times; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Test; @@ -109,6 +122,8 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest { private final static int NUM_SUBCLUSTER = 4; + private final static int APP_PRIORITY_ZERO = 0; + @Override public void setUp() { super.setUpConfig(); @@ -212,7 +227,7 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest { ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class); ApplicationSubmissionContext context = ApplicationSubmissionContext .newInstance(appId, MockApps.newAppName(), "default", - Priority.newInstance(0), amContainerSpec, false, false, -1, + Priority.newInstance(APP_PRIORITY_ZERO), amContainerSpec, false, false, -1, Resources.createResource( YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB), "MockApp"); @@ -898,4 +913,147 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest { interceptor.getResourceTypeInfo(GetAllResourceTypeInfoRequest.newInstance()); Assert.assertEquals(2, response.getResourceTypeInfo().size()); } + + @Test + public void testFailApplicationAttempt() throws Exception { + LOG.info("Test FederationClientInterceptor : Fail Application Attempt request."); + + // null request + LambdaTestUtils.intercept(YarnException.class, "Missing failApplicationAttempt request " + + "or applicationId or applicationAttemptId information.", + () -> interceptor.failApplicationAttempt(null)); + + // normal request + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); + + // Submit the application + SubmitApplicationResponse response = interceptor.submitApplication(request); + + Assert.assertNotNull(response); + Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); + + SubClusterId subClusterId = interceptor.getApplicationHomeSubCluster(appId); + Assert.assertNotNull(subClusterId); + + MockRM mockRM = interceptor.getMockRMs().get(subClusterId); + mockRM.waitForState(appId, RMAppState.ACCEPTED); + RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId); + mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(), + RMAppAttemptState.SCHEDULED); + + // Call GetApplicationAttempts + GetApplicationAttemptsRequest attemptsRequest = + GetApplicationAttemptsRequest.newInstance(appId); + GetApplicationAttemptsResponse attemptsResponse = + interceptor.getApplicationAttempts(attemptsRequest); + Assert.assertNotNull(attemptsResponse); + + ApplicationAttemptId attemptId = attemptsResponse.getApplicationAttemptList(). + get(0).getApplicationAttemptId(); + + FailApplicationAttemptRequest requestFailAppAttempt = + FailApplicationAttemptRequest.newInstance(attemptId); + FailApplicationAttemptResponse responseFailAppAttempt = + interceptor.failApplicationAttempt(requestFailAppAttempt); + + Assert.assertNotNull(responseFailAppAttempt); + } + + @Test + public void testUpdateApplicationPriority() throws Exception { + LOG.info("Test FederationClientInterceptor : Update Application Priority request."); + + // null request + LambdaTestUtils.intercept(YarnException.class, "Missing updateApplicationPriority request " + + "or applicationId or applicationPriority information.", + () -> interceptor.updateApplicationPriority(null)); + + // normal request + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); + + // Submit the application + SubmitApplicationResponse response = interceptor.submitApplication(request); + + Assert.assertNotNull(response); + Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); + + SubClusterId subClusterId = interceptor.getApplicationHomeSubCluster(appId); + Assert.assertNotNull(subClusterId); + + MockRM mockRM = interceptor.getMockRMs().get(subClusterId); + mockRM.waitForState(appId, RMAppState.ACCEPTED); + RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId); + mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(), + RMAppAttemptState.SCHEDULED); + + // Call GetApplicationAttempts + GetApplicationAttemptsRequest attemptsRequest = + GetApplicationAttemptsRequest.newInstance(appId); + GetApplicationAttemptsResponse attemptsResponse = + interceptor.getApplicationAttempts(attemptsRequest); + Assert.assertNotNull(attemptsResponse); + + Priority priority = Priority.newInstance(20); + UpdateApplicationPriorityRequest requestUpdateAppPriority = + UpdateApplicationPriorityRequest.newInstance(appId, priority); + UpdateApplicationPriorityResponse responseAppPriority = + interceptor.updateApplicationPriority(requestUpdateAppPriority); + + Assert.assertNotNull(responseAppPriority); + Assert.assertEquals(20, + responseAppPriority.getApplicationPriority().getPriority()); + } + + @Test + public void testUpdateApplicationTimeouts() throws Exception { + LOG.info("Test FederationClientInterceptor : Update Application Timeouts request."); + + // null request + LambdaTestUtils.intercept(YarnException.class, "Missing updateApplicationTimeouts request " + + "or applicationId or applicationTimeouts information.", + () -> interceptor.updateApplicationTimeouts(null)); + + // normal request + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); + + // Submit the application + SubmitApplicationResponse response = interceptor.submitApplication(request); + + Assert.assertNotNull(response); + Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); + + SubClusterId subClusterId = interceptor.getApplicationHomeSubCluster(appId); + Assert.assertNotNull(subClusterId); + + MockRM mockRM = interceptor.getMockRMs().get(subClusterId); + mockRM.waitForState(appId, RMAppState.ACCEPTED); + RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId); + mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(), + RMAppAttemptState.SCHEDULED); + + // Call GetApplicationAttempts + GetApplicationAttemptsRequest attemptsRequest = + GetApplicationAttemptsRequest.newInstance(appId); + GetApplicationAttemptsResponse attemptsResponse = + interceptor.getApplicationAttempts(attemptsRequest); + Assert.assertNotNull(attemptsResponse); + + String appTimeout = + Times.formatISO8601(System.currentTimeMillis() + 5 * 1000); + Map applicationTimeouts = new HashMap<>(); + applicationTimeouts.put(ApplicationTimeoutType.LIFETIME, appTimeout); + + UpdateApplicationTimeoutsRequest timeoutsRequest = + UpdateApplicationTimeoutsRequest.newInstance(appId, applicationTimeouts); + UpdateApplicationTimeoutsResponse timeoutsResponse = + interceptor.updateApplicationTimeouts(timeoutsRequest); + + String responseTimeOut = + timeoutsResponse.getApplicationTimeouts().get(ApplicationTimeoutType.LIFETIME); + Assert.assertNotNull(timeoutsResponse); + Assert.assertEquals(appTimeout, responseTimeOut); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java index c97d0532425..202a286696a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java @@ -115,4 +115,7 @@ public class TestableFederationClientInterceptor } } + public ConcurrentHashMap getMockRMs() { + return mockRMs; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/resources/yarn-site.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/resources/yarn-site.xml index f3e0de3604b..310a1612486 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/resources/yarn-site.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/resources/yarn-site.xml @@ -27,4 +27,8 @@ yarn.resourcemanager.webapp.address 0.0.0.0:8080 + + yarn.cluster.max-application-priority + 50 +