From bcc51ce2c58f4fc7df9372f437ddf5c49813b51a Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Wed, 1 Mar 2023 06:44:00 +0800 Subject: [PATCH] =?UTF-8?q?YARN-11375.=20=20[Federation]=20Support=20refre?= =?UTF-8?q?shAdminAcls=E3=80=81refreshServiceAcls=20API's=20for=20Federati?= =?UTF-8?q?on.=20(#5312)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../RefreshAdminAclsRequest.java | 23 +++++++ .../RefreshServiceAclsRequest.java | 23 +++++++ ...erver_resourcemanager_service_protos.proto | 2 + .../pb/RefreshAdminAclsRequestPBImpl.java | 49 +++++++++++--- .../pb/RefreshServiceAclsRequestPBImpl.java | 46 +++++++++++-- .../yarn/server/resourcemanager/MockRM.java | 20 ++++++ .../yarn/server/router/RouterMetrics.java | 62 +++++++++++++++++ .../rmadmin/FederationRMAdminInterceptor.java | 58 +++++++++++++++- .../yarn/server/router/TestRouterMetrics.java | 66 +++++++++++++++++++ .../TestFederationRMAdminInterceptor.java | 61 +++++++++++++++++ .../TestableFederationRMAdminInterceptor.java | 9 ++- 11 files changed, 399 insertions(+), 20 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshAdminAclsRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshAdminAclsRequest.java index 71c4a2c46d7..5371741331e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshAdminAclsRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshAdminAclsRequest.java @@ -33,4 +33,27 @@ public abstract class RefreshAdminAclsRequest { Records.newRecord(RefreshAdminAclsRequest.class); return request; } + + @Public + @Stable + public static RefreshAdminAclsRequest newInstance(String subClusterId) { + RefreshAdminAclsRequest request = + Records.newRecord(RefreshAdminAclsRequest.class); + request.setSubClusterId(subClusterId); + return request; + } + + /** + * Get the subClusterId. + * + * @return subClusterId. + */ + public abstract String getSubClusterId(); + + /** + * Set the subClusterId. + * + * @param subClusterId subCluster Id. + */ + public abstract void setSubClusterId(String subClusterId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshServiceAclsRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshServiceAclsRequest.java index 789f54fe29a..e382ebccba1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshServiceAclsRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshServiceAclsRequest.java @@ -33,4 +33,27 @@ public abstract class RefreshServiceAclsRequest { Records.newRecord(RefreshServiceAclsRequest.class); return request; } + + @Public + @Stable + public static RefreshServiceAclsRequest newInstance(String subClusterId) { + RefreshServiceAclsRequest request = + Records.newRecord(RefreshServiceAclsRequest.class); + request.setSubClusterId(subClusterId); + return request; + } + + /** + * Get the subClusterId. + * + * @return subClusterId. + */ + public abstract String getSubClusterId(); + + /** + * Set the subClusterId. + * + * @param subClusterId subCluster Id. + */ + public abstract void setSubClusterId(String subClusterId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto index 97e29f954cd..4050a5b356f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto @@ -58,11 +58,13 @@ message RefreshUserToGroupsMappingsResponseProto { } message RefreshAdminAclsRequestProto { + optional string sub_cluster_id = 1; } message RefreshAdminAclsResponseProto { } message RefreshServiceAclsRequestProto { + optional string sub_cluster_id = 1; } message RefreshServiceAclsResponseProto { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshAdminAclsRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshAdminAclsRequestPBImpl.java index 47eadc111bb..0738e8a1b0c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshAdminAclsRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshAdminAclsRequestPBImpl.java @@ -18,21 +18,22 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; +import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsRequestProtoOrBuilder; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; import org.apache.hadoop.thirdparty.protobuf.TextFormat; @Private @Unstable -public class RefreshAdminAclsRequestPBImpl -extends RefreshAdminAclsRequest { +public class RefreshAdminAclsRequestPBImpl extends RefreshAdminAclsRequest { - RefreshAdminAclsRequestProto proto = RefreshAdminAclsRequestProto.getDefaultInstance(); - RefreshAdminAclsRequestProto.Builder builder = null; - boolean viaProto = false; + private RefreshAdminAclsRequestProto proto = RefreshAdminAclsRequestProto.getDefaultInstance(); + private RefreshAdminAclsRequestProto.Builder builder = null; + private boolean viaProto = false; public RefreshAdminAclsRequestPBImpl() { builder = RefreshAdminAclsRequestProto.newBuilder(); @@ -48,6 +49,13 @@ extends RefreshAdminAclsRequest { viaProto = true; return proto; } + + private synchronized void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = RefreshAdminAclsRequestProto.newBuilder(proto); + } + viaProto = false; + } @Override public int hashCode() { @@ -56,16 +64,39 @@ extends RefreshAdminAclsRequest { @Override public boolean equals(Object other) { - if (other == null) + + if (!(other instanceof RefreshAdminAclsRequest)) { return false; - if (other.getClass().isAssignableFrom(this.getClass())) { - return this.getProto().equals(this.getClass().cast(other).getProto()); } - return false; + + RefreshAdminAclsRequestPBImpl otherImpl = this.getClass().cast(other); + return new EqualsBuilder() + .append(this.getProto(), otherImpl.getProto()) + .isEquals(); } @Override public String toString() { return TextFormat.shortDebugString(getProto()); } + + @Override + public String getSubClusterId() { + RefreshAdminAclsRequestProtoOrBuilder p = viaProto ? proto : builder; + boolean hasSubClusterId = p.hasSubClusterId(); + if (hasSubClusterId) { + return p.getSubClusterId(); + } + return null; + } + + @Override + public void setSubClusterId(String subClusterId) { + maybeInitBuilder(); + if (subClusterId == null) { + builder.clearSubClusterId(); + return; + } + builder.setSubClusterId(subClusterId); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshServiceAclsRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshServiceAclsRequestPBImpl.java index d4529f43e65..4c30d0f2a54 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshServiceAclsRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshServiceAclsRequestPBImpl.java @@ -18,9 +18,11 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; +import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshServiceAclsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshServiceAclsRequestProtoOrBuilder; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest; import org.apache.hadoop.thirdparty.protobuf.TextFormat; @@ -29,10 +31,10 @@ import org.apache.hadoop.thirdparty.protobuf.TextFormat; @Unstable public class RefreshServiceAclsRequestPBImpl extends RefreshServiceAclsRequest { - RefreshServiceAclsRequestProto proto = + private RefreshServiceAclsRequestProto proto = RefreshServiceAclsRequestProto.getDefaultInstance(); - RefreshServiceAclsRequestProto.Builder builder = null; - boolean viaProto = false; + private RefreshServiceAclsRequestProto.Builder builder = null; + private boolean viaProto = false; public RefreshServiceAclsRequestPBImpl() { builder = RefreshServiceAclsRequestProto.newBuilder(); @@ -50,6 +52,13 @@ public class RefreshServiceAclsRequestPBImpl extends RefreshServiceAclsRequest { return proto; } + private synchronized void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = RefreshServiceAclsRequestProto.newBuilder(proto); + } + viaProto = false; + } + @Override public int hashCode() { return getProto().hashCode(); @@ -57,16 +66,39 @@ public class RefreshServiceAclsRequestPBImpl extends RefreshServiceAclsRequest { @Override public boolean equals(Object other) { - if (other == null) + + if (!(other instanceof RefreshServiceAclsRequest)) { return false; - if (other.getClass().isAssignableFrom(this.getClass())) { - return this.getProto().equals(this.getClass().cast(other).getProto()); } - return false; + + RefreshServiceAclsRequestPBImpl otherImpl = this.getClass().cast(other); + return new EqualsBuilder() + .append(this.getProto(), otherImpl.getProto()) + .isEquals(); } @Override public String toString() { return TextFormat.shortDebugString(getProto()); } + + @Override + public String getSubClusterId() { + RefreshServiceAclsRequestProtoOrBuilder p = viaProto ? proto : builder; + boolean hasSubClusterId = p.hasSubClusterId(); + if (hasSubClusterId) { + return p.getSubClusterId(); + } + return null; + } + + @Override + public void setSubClusterId(String subClusterId) { + maybeInitBuilder(); + if (subClusterId == null) { + builder.clearSubClusterId(); + return; + } + builder.setSubClusterId(subClusterId); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 316f8e06cb5..faa5ddb7186 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNod import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.test.GenericTestUtils; @@ -55,8 +56,13 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; @@ -789,6 +795,7 @@ public class MockRM extends ResourceManager { @Override protected AdminService createAdminService() { + RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); return new AdminService(this) { @Override protected void startServer() { @@ -799,6 +806,19 @@ public class MockRM extends ResourceManager { protected void stopServer() { // don't do anything } + + @Override + public RefreshServiceAclsResponse refreshServiceAcls(RefreshServiceAclsRequest request) + throws YarnException, IOException { + Configuration config = this.getConfig(); + boolean authorization = + config.getBoolean(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false); + if (!authorization) { + throw RPCUtil.getRemoteException(new IOException("Service Authorization (" + + CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION + ") not enabled.")); + } + return recordFactory.newRecordInstance(RefreshServiceAclsResponse.class); + } }; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java index d3dd7bab11f..fdcd890ea6e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java @@ -147,6 +147,10 @@ public final class RouterMetrics { private MutableGaugeInt numRefreshSuperUserGroupsConfigurationFailedRetrieved; @Metric("# of refreshUserToGroupsMappings failed to be retrieved") private MutableGaugeInt numRefreshUserToGroupsMappingsFailedRetrieved; + @Metric("# of refreshAdminAcls failed to be retrieved") + private MutableGaugeInt numRefreshAdminAclsFailedRetrieved; + @Metric("# of refreshServiceAcls failed to be retrieved") + private MutableGaugeInt numRefreshServiceAclsFailedRetrieved; @Metric("# of replaceLabelsOnNodes failed to be retrieved") private MutableGaugeInt numReplaceLabelsOnNodesFailedRetrieved; @Metric("# of replaceLabelsOnNode failed to be retrieved") @@ -267,6 +271,10 @@ public final class RouterMetrics { private MutableRate totalSucceededReplaceLabelsOnNodeRetrieved; @Metric("Total number of successful Retrieved GetSchedulerInfo and latency(ms)") private MutableRate totalSucceededGetSchedulerInfoRetrieved; + @Metric("Total number of successful Retrieved RefreshAdminAcls and latency(ms)") + private MutableRate totalSucceededRefreshAdminAclsRetrieved; + @Metric("Total number of successful Retrieved RefreshServiceAcls and latency(ms)") + private MutableRate totalSucceededRefreshServiceAclsRetrieved; @Metric("Total number of successful Retrieved AddToClusterNodeLabels and latency(ms)") private MutableRate totalSucceededAddToClusterNodeLabelsRetrieved; @Metric("Total number of successful Retrieved RemoveFromClusterNodeLabels and latency(ms)") @@ -328,6 +336,8 @@ public final class RouterMetrics { private MutableQuantiles getSchedulerInfoRetrievedLatency; private MutableQuantiles refreshSuperUserGroupsConfLatency; private MutableQuantiles refreshUserToGroupsMappingsLatency; + private MutableQuantiles refreshAdminAclsLatency; + private MutableQuantiles refreshServiceAclsLatency; private MutableQuantiles replaceLabelsOnNodesLatency; private MutableQuantiles replaceLabelsOnNodeLatency; private MutableQuantiles addToClusterNodeLabelsLatency; @@ -524,6 +534,12 @@ public final class RouterMetrics { refreshUserToGroupsMappingsLatency = registry.newQuantiles("refreshUserToGroupsMappingsLatency", "latency of refresh user to groups mappings timeouts", "ops", "latency", 10); + refreshAdminAclsLatency = registry.newQuantiles("refreshAdminAclsLatency", + "latency of refresh admin acls timeouts", "ops", "latency", 10); + + refreshServiceAclsLatency = registry.newQuantiles("refreshServiceAclsLatency", + "latency of refresh service acls timeouts", "ops", "latency", 10); + replaceLabelsOnNodesLatency = registry.newQuantiles("replaceLabelsOnNodesLatency", "latency of replace labels on nodes timeouts", "ops", "latency", 10); @@ -811,6 +827,16 @@ public final class RouterMetrics { return totalSucceededGetSchedulerInfoRetrieved.lastStat().numSamples(); } + @VisibleForTesting + public long getNumSucceededRefreshAdminAclsRetrieved() { + return totalSucceededRefreshAdminAclsRetrieved.lastStat().numSamples(); + } + + @VisibleForTesting + public long getNumSucceededRefreshServiceAclsRetrieved() { + return totalSucceededRefreshServiceAclsRetrieved.lastStat().numSamples(); + } + @VisibleForTesting public long getNumSucceededAddToClusterNodeLabelsRetrieved() { return totalSucceededAddToClusterNodeLabelsRetrieved.lastStat().numSamples(); @@ -1091,6 +1117,16 @@ public final class RouterMetrics { return totalSucceededGetSchedulerInfoRetrieved.lastStat().mean(); } + @VisibleForTesting + public double getLatencySucceededRefreshAdminAclsRetrieved() { + return totalSucceededRefreshAdminAclsRetrieved.lastStat().mean(); + } + + @VisibleForTesting + public double getLatencySucceededRefreshServiceAclsRetrieved() { + return totalSucceededRefreshServiceAclsRetrieved.lastStat().mean(); + } + @VisibleForTesting public double getLatencySucceededAddToClusterNodeLabelsRetrieved() { return totalSucceededAddToClusterNodeLabelsRetrieved.lastStat().mean(); @@ -1322,6 +1358,14 @@ public final class RouterMetrics { return numRefreshUserToGroupsMappingsFailedRetrieved.value(); } + public int getNumRefreshAdminAclsFailedRetrieved() { + return numRefreshAdminAclsFailedRetrieved.value(); + } + + public int getNumRefreshServiceAclsFailedRetrieved() { + return numRefreshServiceAclsFailedRetrieved.value(); + } + public int getNumReplaceLabelsOnNodesFailedRetrieved() { return numReplaceLabelsOnNodesFailedRetrieved.value(); } @@ -1621,6 +1665,16 @@ public final class RouterMetrics { getSchedulerInfoRetrievedLatency.add(duration); } + public void succeededRefreshAdminAclsRetrieved(long duration) { + totalSucceededRefreshAdminAclsRetrieved.add(duration); + refreshAdminAclsLatency.add(duration); + } + + public void succeededRefreshServiceAclsRetrieved(long duration) { + totalSucceededRefreshServiceAclsRetrieved.add(duration); + refreshServiceAclsLatency.add(duration); + } + public void succeededAddToClusterNodeLabelsRetrieved(long duration) { totalSucceededAddToClusterNodeLabelsRetrieved.add(duration); addToClusterNodeLabelsLatency.add(duration); @@ -1835,6 +1889,14 @@ public final class RouterMetrics { numRefreshUserToGroupsMappingsFailedRetrieved.incr(); } + public void incrRefreshAdminAclsFailedRetrieved() { + numRefreshAdminAclsFailedRetrieved.incr(); + } + + public void incrRefreshServiceAclsFailedRetrieved() { + numRefreshServiceAclsFailedRetrieved.incr(); + } + public void incrAddToClusterNodeLabelsFailedRetrieved() { numAddToClusterNodeLabelsFailedRetrieved.incr(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java index 41d87c3f588..93e864bb980 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java @@ -372,13 +372,67 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep @Override public RefreshAdminAclsResponse refreshAdminAcls(RefreshAdminAclsRequest request) throws YarnException, IOException { - throw new NotImplementedException(); + + // parameter verification. + if (request == null) { + routerMetrics.incrRefreshAdminAclsFailedRetrieved(); + RouterServerUtil.logAndThrowException("Missing RefreshAdminAcls request.", null); + } + + // call refreshAdminAcls of activeSubClusters. + try { + long startTime = clock.getTime(); + RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod( + new Class[] {RefreshAdminAclsRequest.class}, new Object[] {request}); + String subClusterId = request.getSubClusterId(); + Collection refreshAdminAclsResps = + remoteMethod.invokeConcurrent(this, RefreshAdminAclsResponse.class, subClusterId); + if (CollectionUtils.isNotEmpty(refreshAdminAclsResps)) { + long stopTime = clock.getTime(); + routerMetrics.succeededRefreshAdminAclsRetrieved(stopTime - startTime); + return RefreshAdminAclsResponse.newInstance(); + } + } catch (YarnException e) { + routerMetrics.incrRefreshAdminAclsFailedRetrieved(); + RouterServerUtil.logAndThrowException(e, + "Unable to refreshAdminAcls due to exception. " + e.getMessage()); + } + + routerMetrics.incrRefreshAdminAclsFailedRetrieved(); + throw new YarnException("Unable to refreshAdminAcls."); } @Override public RefreshServiceAclsResponse refreshServiceAcls(RefreshServiceAclsRequest request) throws YarnException, IOException { - throw new NotImplementedException(); + + // parameter verification. + if (request == null) { + routerMetrics.incrRefreshServiceAclsFailedRetrieved(); + RouterServerUtil.logAndThrowException("Missing RefreshServiceAcls request.", null); + } + + // call refreshAdminAcls of activeSubClusters. + try { + long startTime = clock.getTime(); + RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod( + new Class[]{RefreshServiceAclsRequest.class}, new Object[]{request}); + String subClusterId = request.getSubClusterId(); + Collection refreshServiceAclsResps = + remoteMethod.invokeConcurrent(this, RefreshServiceAclsResponse.class, subClusterId); + if (CollectionUtils.isNotEmpty(refreshServiceAclsResps)) { + long stopTime = clock.getTime(); + routerMetrics.succeededRefreshServiceAclsRetrieved(stopTime - startTime); + return RefreshServiceAclsResponse.newInstance(); + } + } catch (YarnException e) { + routerMetrics.incrRefreshServiceAclsFailedRetrieved(); + RouterServerUtil.logAndThrowException(e, + "Unable to refreshAdminAcls due to exception. " + e.getMessage()); + } + + routerMetrics.incrRefreshServiceAclsFailedRetrieved(); + throw new YarnException("Unable to refreshServiceAcls."); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java index db0b6837603..a3756174573 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java @@ -534,6 +534,16 @@ public class TestRouterMetrics { metrics.incrRenewDelegationTokenFailedRetrieved(); } + public void getRefreshAdminAclsFailedRetrieved() { + LOG.info("Mocked: failed refreshAdminAcls call"); + metrics.incrRefreshAdminAclsFailedRetrieved(); + } + + public void getRefreshServiceAclsFailedRetrieved() { + LOG.info("Mocked: failed refreshServiceAcls call"); + metrics.incrRefreshServiceAclsFailedRetrieved(); + } + public void getReplaceLabelsOnNodesFailed() { LOG.info("Mocked: failed replaceLabelsOnNodes call"); metrics.incrReplaceLabelsOnNodesFailedRetrieved(); @@ -789,6 +799,16 @@ public class TestRouterMetrics { metrics.succeededRenewDelegationTokenRetrieved(duration); } + public void getRefreshAdminAclsRetrieved(long duration) { + LOG.info("Mocked: successful RefreshAdminAcls call with duration {}", duration); + metrics.succeededRefreshAdminAclsRetrieved(duration); + } + + public void getRefreshServiceAclsRetrieved(long duration) { + LOG.info("Mocked: successful RefreshServiceAcls call with duration {}", duration); + metrics.succeededRefreshServiceAclsRetrieved(duration); + } + public void getNumSucceededReplaceLabelsOnNodesRetrieved(long duration) { LOG.info("Mocked: successful ReplaceLabelsOnNodes call with duration {}", duration); metrics.succeededReplaceLabelsOnNodesRetrieved(duration); @@ -1653,6 +1673,52 @@ public class TestRouterMetrics { metrics.getRenewDelegationTokenFailedRetrieved()); } + @Test + public void testRefreshAdminAclsRetrieved() { + long totalGoodBefore = metrics.getNumSucceededRefreshAdminAclsRetrieved(); + goodSubCluster.getRefreshAdminAclsRetrieved(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededRefreshAdminAclsRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededRefreshAdminAclsRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.getRefreshAdminAclsRetrieved(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededRefreshAdminAclsRetrieved()); + Assert.assertEquals(225, + metrics.getLatencySucceededRefreshAdminAclsRetrieved(), ASSERT_DOUBLE_DELTA); + } + + @Test + public void testRefreshAdminAclsRetrievedFailed() { + long totalBadBefore = metrics.getNumRefreshAdminAclsFailedRetrieved(); + badSubCluster.getRefreshAdminAclsFailedRetrieved(); + Assert.assertEquals(totalBadBefore + 1, + metrics.getNumRefreshAdminAclsFailedRetrieved()); + } + + @Test + public void testRefreshServiceAclsRetrieved() { + long totalGoodBefore = metrics.getNumSucceededRefreshServiceAclsRetrieved(); + goodSubCluster.getRefreshServiceAclsRetrieved(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededRefreshServiceAclsRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededRefreshServiceAclsRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.getRefreshServiceAclsRetrieved(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededRefreshServiceAclsRetrieved()); + Assert.assertEquals(225, + metrics.getLatencySucceededRefreshServiceAclsRetrieved(), ASSERT_DOUBLE_DELTA); + } + + @Test + public void testRefreshServiceAclsRetrievedFailed() { + long totalBadBefore = metrics.getNumRefreshServiceAclsFailedRetrieved(); + badSubCluster.getRefreshServiceAclsFailedRetrieved(); + Assert.assertEquals(totalBadBefore + 1, + metrics.getNumRefreshServiceAclsFailedRetrieved()); + } + @Test public void testReplaceLabelsOnNodesRetrieved() { long totalGoodBefore = metrics.getNumSucceededReplaceLabelsOnNodesRetrieved(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java index 977f82dd3cd..60a782bd8a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.router.rmadmin; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.yarn.api.records.DecommissionType; @@ -30,6 +31,10 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsC import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse; import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; @@ -113,6 +118,8 @@ public class TestFederationRMAdminInterceptor extends BaseRouterRMAdminTest { config.set(YarnConfiguration.ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE, mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass + "," + TestFederationRMAdminInterceptor.class.getName()); + config.setBoolean( + CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, true); return config; } @@ -259,4 +266,58 @@ public class TestFederationRMAdminInterceptor extends BaseRouterRMAdminTest { "subClusterId = SC-NON is not an active subCluster.", () -> interceptor.refreshUserToGroupsMappings(request1)); } + + @Test + public void testRefreshAdminAcls() throws Exception { + // null request. + LambdaTestUtils.intercept(YarnException.class, "Missing RefreshAdminAcls request.", + () -> interceptor.refreshAdminAcls(null)); + + // normal request. + RefreshAdminAclsRequest request = RefreshAdminAclsRequest.newInstance(); + RefreshAdminAclsResponse response = interceptor.refreshAdminAcls(request); + assertNotNull(response); + } + + @Test + public void testSC1RefreshAdminAcls() throws Exception { + // case 1, test the existing subCluster (SC-1). + String existSubCluster = "SC-1"; + RefreshAdminAclsRequest request = RefreshAdminAclsRequest.newInstance(existSubCluster); + RefreshAdminAclsResponse response = interceptor.refreshAdminAcls(request); + assertNotNull(response); + + // case 2, test the non-exist subCluster. + String notExistsSubCluster = "SC-NON"; + RefreshAdminAclsRequest request1 = RefreshAdminAclsRequest.newInstance(notExistsSubCluster); + LambdaTestUtils.intercept(Exception.class, "subClusterId = SC-NON is not an active subCluster.", + () -> interceptor.refreshAdminAcls(request1)); + } + + @Test + public void testRefreshServiceAcls() throws Exception { + // null request. + LambdaTestUtils.intercept(YarnException.class, "Missing RefreshServiceAcls request.", + () -> interceptor.refreshServiceAcls(null)); + + // normal request. + RefreshServiceAclsRequest request = RefreshServiceAclsRequest.newInstance(); + RefreshServiceAclsResponse response = interceptor.refreshServiceAcls(request); + assertNotNull(response); + } + + @Test + public void testSC1RefreshServiceAcls() throws Exception { + // case 1, test the existing subCluster (SC-1). + String existSubCluster = "SC-1"; + RefreshServiceAclsRequest request = RefreshServiceAclsRequest.newInstance(existSubCluster); + RefreshServiceAclsResponse response = interceptor.refreshServiceAcls(request); + assertNotNull(response); + + // case 2, test the non-exist subCluster. + String notExistsSubCluster = "SC-NON"; + RefreshServiceAclsRequest request1 = RefreshServiceAclsRequest.newInstance(notExistsSubCluster); + LambdaTestUtils.intercept(Exception.class, "subClusterId = SC-NON is not an active subCluster.", + () -> interceptor.refreshServiceAcls(request1)); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestableFederationRMAdminInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestableFederationRMAdminInterceptor.java index 26f50f88b89..b95bcd4a62b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestableFederationRMAdminInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestableFederationRMAdminInterceptor.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.router.rmadmin; import org.apache.commons.collections.MapUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; @@ -35,6 +36,8 @@ import java.util.HashSet; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_CLUSTER_ID; + public class TestableFederationRMAdminInterceptor extends FederationRMAdminInterceptor { // Record log information @@ -55,11 +58,13 @@ public class TestableFederationRMAdminInterceptor extends FederationRMAdminInter if (mockRMs.containsKey(subClusterId)) { mockRM = mockRMs.get(subClusterId); } else { - mockRM = new MockRM(); + YarnConfiguration config = new YarnConfiguration(super.getConf()); + config.set(RM_CLUSTER_ID, "subcluster." + subClusterId); + mockRM = new MockRM(config); if (badSubCluster.contains(subClusterId)) { return new MockRMAdminBadService(mockRM); } - mockRM.init(super.getConf()); + mockRM.init(config); mockRM.start(); mockRMs.put(subClusterId, mockRM); }