From 55eebcf277da3fb3a8af2c316c36c63be3c3ed65 Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Wed, 26 Apr 2023 05:12:38 +0800 Subject: [PATCH] =?UTF-8?q?YARN-11378.=20[Federation]=20Support=20checkFor?= =?UTF-8?q?DecommissioningNodes=E3=80=81refreshClusterMaxPriority=20API's?= =?UTF-8?q?=20for=20Federation.=20(#5551)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../CheckForDecommissioningNodesRequest.java | 29 ++++++ .../RefreshClusterMaxPriorityRequest.java | 29 ++++++ ...erver_resourcemanager_service_protos.proto | 2 + ...kForDecommissioningNodesRequestPBImpl.java | 24 +++++ ...efreshClusterMaxPriorityRequestPBImpl.java | 28 +++++- .../yarn/server/router/RouterMetrics.java | 63 +++++++++++++ .../rmadmin/FederationRMAdminInterceptor.java | 89 +++++++++++++++++-- .../yarn/server/router/TestRouterMetrics.java | 67 ++++++++++++++ .../TestFederationRMAdminInterceptor.java | 29 ++++++ 9 files changed, 351 insertions(+), 9 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/CheckForDecommissioningNodesRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/CheckForDecommissioningNodesRequest.java index 27dee91b6d3..e7827a25060 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/CheckForDecommissioningNodesRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/CheckForDecommissioningNodesRequest.java @@ -19,7 +19,9 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.yarn.util.Records; @Private @@ -32,4 +34,31 @@ public abstract class CheckForDecommissioningNodesRequest { .newRecord(CheckForDecommissioningNodesRequest.class); return request; } + + @Private + @Unstable + public static CheckForDecommissioningNodesRequest newInstance(String subClusterId) { + CheckForDecommissioningNodesRequest request = Records + .newRecord(CheckForDecommissioningNodesRequest.class); + request.setSubClusterId(subClusterId); + return request; + } + + /** + * Get the subClusterId. + * + * @return subClusterId. + */ + @Public + @Evolving + public abstract String getSubClusterId(); + + /** + * Set the subClusterId. + * + * @param subClusterId subCluster Id. + */ + @Public + @Evolving + public abstract void setSubClusterId(String subClusterId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshClusterMaxPriorityRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshClusterMaxPriorityRequest.java index 1c63e2adaa3..3d54de1f318 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshClusterMaxPriorityRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshClusterMaxPriorityRequest.java @@ -19,7 +19,9 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.yarn.util.Records; @Private @@ -32,4 +34,31 @@ public abstract class RefreshClusterMaxPriorityRequest { Records.newRecord(RefreshClusterMaxPriorityRequest.class); return request; } + + @Private + @Unstable + public static RefreshClusterMaxPriorityRequest newInstance(String subClusterId) { + RefreshClusterMaxPriorityRequest request = + Records.newRecord(RefreshClusterMaxPriorityRequest.class); + request.setSubClusterId(subClusterId); + return request; + } + + /** + * Get the subClusterId. + * + * @return subClusterId. + */ + @Public + @Evolving + public abstract String getSubClusterId(); + + /** + * Set the subClusterId. + * + * @param subClusterId subCluster Id. + */ + @Public + @Evolving + public abstract void setSubClusterId(String subClusterId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto index f2145ca73d0..141c9025363 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto @@ -122,12 +122,14 @@ message UpdateNodeLabelsResponseProto { } message CheckForDecommissioningNodesRequestProto { + optional string sub_cluster_id = 1; } message CheckForDecommissioningNodesResponseProto { repeated NodeIdProto decommissioningNodes = 1; } message RefreshClusterMaxPriorityRequestProto { + optional string sub_cluster_id = 1; } message RefreshClusterMaxPriorityResponseProto { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/CheckForDecommissioningNodesRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/CheckForDecommissioningNodesRequestPBImpl.java index 96c13c17795..100ee44d7c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/CheckForDecommissioningNodesRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/CheckForDecommissioningNodesRequestPBImpl.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.CheckForDecommissioningNodesRequestProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.CheckForDecommissioningNodesRequestProto; import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest; @@ -51,6 +52,13 @@ public class CheckForDecommissioningNodesRequestPBImpl extends return proto; } + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = CheckForDecommissioningNodesRequestProto.newBuilder(proto); + } + viaProto = false; + } + @Override public int hashCode() { return getProto().hashCode(); @@ -70,4 +78,20 @@ public class CheckForDecommissioningNodesRequestPBImpl extends public String toString() { return TextFormat.shortDebugString(getProto()); } + + @Override + public String getSubClusterId() { + CheckForDecommissioningNodesRequestProtoOrBuilder p = viaProto ? proto : builder; + return (p.hasSubClusterId()) ? p.getSubClusterId() : null; + } + + @Override + public void setSubClusterId(String subClusterId) { + maybeInitBuilder(); + if (subClusterId == null) { + builder.clearSubClusterId(); + return; + } + builder.setSubClusterId(subClusterId); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshClusterMaxPriorityRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshClusterMaxPriorityRequestPBImpl.java index 4d2f766b35c..17ce7532e1c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshClusterMaxPriorityRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshClusterMaxPriorityRequestPBImpl.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshClusterMaxPriorityRequestProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshClusterMaxPriorityRequestProto; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest; @@ -27,9 +28,7 @@ import org.apache.hadoop.thirdparty.protobuf.TextFormat; @Private @Unstable -public class RefreshClusterMaxPriorityRequestPBImpl - extends - RefreshClusterMaxPriorityRequest { +public class RefreshClusterMaxPriorityRequestPBImpl extends RefreshClusterMaxPriorityRequest { RefreshClusterMaxPriorityRequestProto proto = RefreshClusterMaxPriorityRequestProto.getDefaultInstance(); @@ -51,6 +50,13 @@ public class RefreshClusterMaxPriorityRequestPBImpl viaProto = true; return proto; } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = RefreshClusterMaxPriorityRequestProto.newBuilder(proto); + } + viaProto = false; + } @Override public int hashCode() { @@ -71,4 +77,20 @@ public class RefreshClusterMaxPriorityRequestPBImpl public String toString() { return TextFormat.shortDebugString(getProto()); } + + @Override + public String getSubClusterId() { + RefreshClusterMaxPriorityRequestProtoOrBuilder p = viaProto ? proto : builder; + return (p.hasSubClusterId()) ? p.getSubClusterId() : null; + } + + @Override + public void setSubClusterId(String subClusterId) { + maybeInitBuilder(); + if (subClusterId == null) { + builder.clearSubClusterId(); + return; + } + builder.setSubClusterId(subClusterId); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java index a84a315b93c..a2627c5a584 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java @@ -171,6 +171,10 @@ public final class RouterMetrics { private MutableGaugeInt numUpdateNodeResourceFailedRetrieved; @Metric("# of refreshNodesResources failed to be retrieved") private MutableGaugeInt numRefreshNodesResourcesFailedRetrieved; + @Metric("# of checkForDecommissioningNodes failed to be retrieved") + private MutableGaugeInt numCheckForDecommissioningNodesFailedRetrieved; + @Metric("# of refreshClusterMaxPriority failed to be retrieved") + private MutableGaugeInt numRefreshClusterMaxPriorityFailedRetrieved; // Aggregate metrics are shared, and don't have to be looked up per call @Metric("Total number of successful Submitted apps and latency(ms)") @@ -303,6 +307,10 @@ public final class RouterMetrics { private MutableRate totalSucceededUpdateNodeResourceRetrieved; @Metric("Total number of successful Retrieved RefreshNodesResources and latency(ms)") private MutableRate totalSucceededRefreshNodesResourcesRetrieved; + @Metric("Total number of successful Retrieved CheckForDecommissioningNodes and latency(ms)") + private MutableRate totalSucceededCheckForDecommissioningNodesRetrieved; + @Metric("Total number of successful Retrieved RefreshClusterMaxPriority and latency(ms)") + private MutableRate totalSucceededRefreshClusterMaxPriorityRetrieved; /** * Provide quantile counters for all latencies. @@ -372,6 +380,8 @@ public final class RouterMetrics { private MutableQuantiles getClusterUserInfoLatency; private MutableQuantiles updateNodeResourceLatency; private MutableQuantiles refreshNodesResourcesLatency; + private MutableQuantiles checkForDecommissioningNodesLatency; + private MutableQuantiles refreshClusterMaxPriorityLatency; private static volatile RouterMetrics instance = null; private static MetricsRegistry registry; @@ -599,6 +609,13 @@ public final class RouterMetrics { refreshNodesResourcesLatency = registry.newQuantiles("refreshNodesResourcesLatency", "latency of refresh nodes resources timeouts", "ops", "latency", 10); + + checkForDecommissioningNodesLatency = registry.newQuantiles( + "checkForDecommissioningNodesLatency", "latency of check for decommissioningnodes timeouts", + "ops", "latency", 10); + + refreshClusterMaxPriorityLatency = registry.newQuantiles("refreshClusterMaxPriorityLatency", + "latency of refresh cluster max priority timeouts", "ops", "latency", 10); } public static RouterMetrics getMetrics() { @@ -925,6 +942,16 @@ public final class RouterMetrics { return totalSucceededRefreshNodesResourcesRetrieved.lastStat().numSamples(); } + @VisibleForTesting + public long getNumSucceededCheckForDecommissioningNodesRetrieved() { + return totalSucceededCheckForDecommissioningNodesRetrieved.lastStat().numSamples(); + } + + @VisibleForTesting + public long getNumSucceededRefreshClusterMaxPriorityRetrieved() { + return totalSucceededRefreshClusterMaxPriorityRetrieved.lastStat().numSamples(); + } + @VisibleForTesting public long getNumSucceededRefreshSuperUserGroupsConfigurationRetrieved() { return totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.lastStat().numSamples(); @@ -1245,6 +1272,16 @@ public final class RouterMetrics { return totalSucceededRefreshNodesResourcesRetrieved.lastStat().mean(); } + @VisibleForTesting + public double getLatencySucceededCheckForDecommissioningNodesRetrieved() { + return totalSucceededCheckForDecommissioningNodesRetrieved.lastStat().mean(); + } + + @VisibleForTesting + public double getLatencySucceededRefreshClusterMaxPriorityRetrieved() { + return totalSucceededRefreshClusterMaxPriorityRetrieved.lastStat().mean(); + } + @VisibleForTesting public double getLatencySucceededRefreshSuperUserGroupsConfigurationRetrieved() { return totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.lastStat().mean(); @@ -1514,6 +1551,14 @@ public final class RouterMetrics { return numRefreshNodesResourcesFailedRetrieved.value(); } + public int getCheckForDecommissioningNodesFailedRetrieved() { + return numCheckForDecommissioningNodesFailedRetrieved.value(); + } + + public int getRefreshClusterMaxPriorityFailedRetrieved() { + return numRefreshClusterMaxPriorityFailedRetrieved.value(); + } + public int getDelegationTokenFailedRetrieved() { return numGetDelegationTokenFailedRetrieved.value(); } @@ -1847,6 +1892,16 @@ public final class RouterMetrics { refreshNodesResourcesLatency.add(duration); } + public void succeededCheckForDecommissioningNodesRetrieved(long duration) { + totalSucceededCheckForDecommissioningNodesRetrieved.add(duration); + checkForDecommissioningNodesLatency.add(duration); + } + + public void succeededRefreshClusterMaxPriorityRetrieved(long duration) { + totalSucceededRefreshClusterMaxPriorityRetrieved.add(duration); + refreshClusterMaxPriorityLatency.add(duration); + } + public void succeededRefreshSuperUserGroupsConfRetrieved(long duration) { totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.add(duration); refreshSuperUserGroupsConfLatency.add(duration); @@ -2091,6 +2146,14 @@ public final class RouterMetrics { numRefreshNodesResourcesFailedRetrieved.incr(); } + public void incrCheckForDecommissioningNodesFailedRetrieved() { + numCheckForDecommissioningNodesFailedRetrieved.incr(); + } + + public void incrRefreshClusterMaxPriorityFailedRetrieved() { + numRefreshClusterMaxPriorityFailedRetrieved.incr(); + } + public void incrGetDelegationTokenFailedRetrieved() { numGetDelegationTokenFailedRetrieved.incr(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java index c3cac82e38c..76f1e8fdc67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -70,14 +71,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.Collection; +import java.util.Set; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; public class FederationRMAdminInterceptor extends AbstractRMAdminRequestInterceptor { @@ -623,16 +627,89 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep @Override public CheckForDecommissioningNodesResponse checkForDecommissioningNodes( - CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest) - throws YarnException, IOException { - throw new NotImplementedException(); + CheckForDecommissioningNodesRequest request) throws YarnException, IOException { + + // Parameter check + if (request == null) { + RouterServerUtil.logAndThrowException("Missing checkForDecommissioningNodes request.", null); + routerMetrics.incrCheckForDecommissioningNodesFailedRetrieved(); + } + + String subClusterId = request.getSubClusterId(); + if (StringUtils.isBlank(subClusterId)) { + routerMetrics.incrCheckForDecommissioningNodesFailedRetrieved(); + RouterServerUtil.logAndThrowException("Missing checkForDecommissioningNodes SubClusterId.", + null); + } + + try { + long startTime = clock.getTime(); + RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod( + new Class[]{CheckForDecommissioningNodesRequest.class}, new Object[]{request}); + + Collection responses = + remoteMethod.invokeConcurrent(this, CheckForDecommissioningNodesResponse.class, + subClusterId); + + if (CollectionUtils.isNotEmpty(responses)) { + // We selected a subCluster, the list is not empty and size=1. + List collects = + responses.stream().collect(Collectors.toList()); + if (!collects.isEmpty() && collects.size() == 1) { + CheckForDecommissioningNodesResponse response = collects.get(0); + long stopTime = clock.getTime(); + routerMetrics.succeededCheckForDecommissioningNodesRetrieved((stopTime - startTime)); + Set nodes = response.getDecommissioningNodes(); + return CheckForDecommissioningNodesResponse.newInstance(nodes); + } + } + } catch (YarnException e) { + routerMetrics.incrCheckForDecommissioningNodesFailedRetrieved(); + RouterServerUtil.logAndThrowException(e, + "Unable to checkForDecommissioningNodes due to exception " + e.getMessage()); + } + + routerMetrics.incrCheckForDecommissioningNodesFailedRetrieved(); + throw new YarnException("Unable to checkForDecommissioningNodes."); } @Override public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority( - RefreshClusterMaxPriorityRequest request) - throws YarnException, IOException { - throw new NotImplementedException(); + RefreshClusterMaxPriorityRequest request) throws YarnException, IOException { + + // parameter verification. + if (request == null) { + routerMetrics.incrRefreshClusterMaxPriorityFailedRetrieved(); + RouterServerUtil.logAndThrowException("Missing RefreshClusterMaxPriority request.", null); + } + + String subClusterId = request.getSubClusterId(); + if (StringUtils.isBlank(subClusterId)) { + routerMetrics.incrRefreshClusterMaxPriorityFailedRetrieved(); + RouterServerUtil.logAndThrowException("Missing RefreshClusterMaxPriority SubClusterId.", + null); + } + + try { + long startTime = clock.getTime(); + RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod( + new Class[]{RefreshClusterMaxPriorityRequest.class}, new Object[]{request}); + Collection refreshClusterMaxPriorityResps = + remoteMethod.invokeConcurrent(this, RefreshClusterMaxPriorityResponse.class, + subClusterId); + if (CollectionUtils.isNotEmpty(refreshClusterMaxPriorityResps)) { + long stopTime = clock.getTime(); + routerMetrics.succeededRefreshClusterMaxPriorityRetrieved(stopTime - startTime); + return RefreshClusterMaxPriorityResponse.newInstance(); + } + } catch (YarnException e) { + routerMetrics.incrRefreshClusterMaxPriorityFailedRetrieved(); + RouterServerUtil.logAndThrowException(e, + "Unable to refreshClusterMaxPriority due to exception. " + e.getMessage()); + } + + routerMetrics.incrRefreshClusterMaxPriorityFailedRetrieved(); + throw new YarnException("Unable to refreshClusterMaxPriority."); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java index f8dc03a04c6..eee74e5f525 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java @@ -598,6 +598,16 @@ public class TestRouterMetrics { LOG.info("Mocked: failed refreshNodesResources call"); metrics.incrRefreshNodesResourcesFailedRetrieved(); } + + public void getCheckForDecommissioningNodesFailed() { + LOG.info("Mocked: failed checkForDecommissioningNodes call"); + metrics.incrCheckForDecommissioningNodesFailedRetrieved(); + } + + public void getRefreshClusterMaxPriorityFailed() { + LOG.info("Mocked: failed refreshClusterMaxPriority call"); + metrics.incrRefreshClusterMaxPriorityFailedRetrieved(); + } } // Records successes for all calls @@ -898,6 +908,18 @@ public class TestRouterMetrics { LOG.info("Mocked: successful RefreshNodesResourcesRetrieved call with duration {}", duration); metrics.succeededRefreshNodesResourcesRetrieved(duration); } + + public void getCheckForDecommissioningNodesRetrieved(long duration) { + LOG.info("Mocked: successful CheckForDecommissioningNodesRetrieved call with duration {}", + duration); + metrics.succeededCheckForDecommissioningNodesRetrieved(duration); + } + + public void getRefreshClusterMaxPriorityRetrieved(long duration) { + LOG.info("Mocked: successful RefreshClusterMaxPriority call with duration {}", + duration); + metrics.succeededRefreshClusterMaxPriorityRetrieved(duration); + } } @Test @@ -2042,4 +2064,49 @@ public class TestRouterMetrics { Assert.assertEquals(225, metrics.getLatencySucceededRefreshNodesResourcesRetrieved(), ASSERT_DOUBLE_DELTA); } + + @Test + public void testCheckForDecommissioningNodesFailedRetrieved() { + long totalBadBefore = metrics.getCheckForDecommissioningNodesFailedRetrieved(); + badSubCluster.getCheckForDecommissioningNodesFailed(); + Assert.assertEquals(totalBadBefore + 1, + metrics.getCheckForDecommissioningNodesFailedRetrieved()); + } + + @Test + public void testCheckForDecommissioningNodesRetrieved() { + long totalGoodBefore = metrics.getNumSucceededCheckForDecommissioningNodesRetrieved(); + goodSubCluster.getCheckForDecommissioningNodesRetrieved(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededCheckForDecommissioningNodesRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededCheckForDecommissioningNodesRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.getCheckForDecommissioningNodesRetrieved(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededCheckForDecommissioningNodesRetrieved()); + Assert.assertEquals(225, + metrics.getLatencySucceededCheckForDecommissioningNodesRetrieved(), ASSERT_DOUBLE_DELTA); + } + + @Test + public void testRefreshClusterMaxPriorityFailedRetrieved() { + long totalBadBefore = metrics.getRefreshClusterMaxPriorityFailedRetrieved(); + badSubCluster.getRefreshClusterMaxPriorityFailed(); + Assert.assertEquals(totalBadBefore + 1, metrics.getRefreshClusterMaxPriorityFailedRetrieved()); + } + + @Test + public void testRefreshClusterMaxPriorityRetrieved() { + long totalGoodBefore = metrics.getNumSucceededRefreshClusterMaxPriorityRetrieved(); + goodSubCluster.getRefreshClusterMaxPriorityRetrieved(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededRefreshClusterMaxPriorityRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededRefreshClusterMaxPriorityRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.getRefreshClusterMaxPriorityRetrieved(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededRefreshClusterMaxPriorityRetrieved()); + Assert.assertEquals(225, + metrics.getLatencySucceededRefreshClusterMaxPriorityRetrieved(), ASSERT_DOUBLE_DELTA); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java index fa38bd6f4ce..0fbe470a312 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java @@ -49,6 +49,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLa import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse; import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; @@ -65,6 +67,7 @@ import java.util.Map; import java.util.Set; import java.util.HashSet; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; /** @@ -521,4 +524,30 @@ public class TestFederationRMAdminInterceptor extends BaseRouterRMAdminTest { "subClusterId = SC-NON is not an active subCluster.", () -> interceptor.replaceLabelsOnNode(request2)); } + + @Test + public void testCheckForDecommissioningNodesRequest() throws Exception { + // null request1. + LambdaTestUtils.intercept(YarnException.class, "Missing checkForDecommissioningNodes request.", + () -> interceptor.checkForDecommissioningNodes(null)); + + // null request2. + CheckForDecommissioningNodesRequest request = + CheckForDecommissioningNodesRequest.newInstance(null); + LambdaTestUtils.intercept(YarnException.class, + "Missing checkForDecommissioningNodes SubClusterId.", + () -> interceptor.checkForDecommissioningNodes(request)); + } + + @Test + public void testCheckForDecommissioningNodesNormalRequest() throws Exception { + CheckForDecommissioningNodesRequest request = + CheckForDecommissioningNodesRequest.newInstance("SC-1"); + CheckForDecommissioningNodesResponse response = + interceptor.checkForDecommissioningNodes(request); + assertNotNull(response); + Set nodeIds = response.getDecommissioningNodes(); + assertNotNull(nodeIds); + assertEquals(0, nodeIds.size()); + } }