YARN-11374. [Federation] Support refreshSuperUserGroupsConfiguration、refreshUserToGroupsMappings API's for Federation. (#5193)
This commit is contained in:
parent
1263e024b9
commit
4520448327
|
@ -33,4 +33,27 @@ public abstract class RefreshSuperUserGroupsConfigurationRequest {
|
|||
Records.newRecord(RefreshSuperUserGroupsConfigurationRequest.class);
|
||||
return request;
|
||||
}
|
||||
|
||||
@Public
|
||||
@Stable
|
||||
public static RefreshSuperUserGroupsConfigurationRequest newInstance(String subClusterId) {
|
||||
RefreshSuperUserGroupsConfigurationRequest request =
|
||||
Records.newRecord(RefreshSuperUserGroupsConfigurationRequest.class);
|
||||
request.setSubClusterId(subClusterId);
|
||||
return request;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the subClusterId.
|
||||
*
|
||||
* @return subClusterId.
|
||||
*/
|
||||
public abstract String getSubClusterId();
|
||||
|
||||
/**
|
||||
* Set the subClusterId.
|
||||
*
|
||||
* @param subClusterId subCluster Id.
|
||||
*/
|
||||
public abstract void setSubClusterId(String subClusterId);
|
||||
}
|
||||
|
|
|
@ -33,4 +33,27 @@ public abstract class RefreshUserToGroupsMappingsRequest {
|
|||
Records.newRecord(RefreshUserToGroupsMappingsRequest.class);
|
||||
return request;
|
||||
}
|
||||
|
||||
@Public
|
||||
@Stable
|
||||
public static RefreshUserToGroupsMappingsRequest newInstance(String subClusterId) {
|
||||
RefreshUserToGroupsMappingsRequest request =
|
||||
Records.newRecord(RefreshUserToGroupsMappingsRequest.class);
|
||||
request.setSubClusterId(subClusterId);
|
||||
return request;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the subClusterId.
|
||||
*
|
||||
* @return subClusterId.
|
||||
*/
|
||||
public abstract String getSubClusterId();
|
||||
|
||||
/**
|
||||
* Set the subClusterId.
|
||||
*
|
||||
* @param subClusterId subCluster Id.
|
||||
*/
|
||||
public abstract void setSubClusterId(String subClusterId);
|
||||
}
|
||||
|
|
|
@ -46,11 +46,13 @@ message RefreshNodesResponseProto {
|
|||
}
|
||||
|
||||
message RefreshSuperUserGroupsConfigurationRequestProto {
|
||||
optional string sub_cluster_id = 1;
|
||||
}
|
||||
message RefreshSuperUserGroupsConfigurationResponseProto {
|
||||
}
|
||||
|
||||
message RefreshUserToGroupsMappingsRequestProto {
|
||||
optional string sub_cluster_id = 1;
|
||||
}
|
||||
message RefreshUserToGroupsMappingsResponseProto {
|
||||
}
|
||||
|
|
|
@ -18,8 +18,10 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
|
||||
|
||||
import org.apache.commons.lang3.builder.EqualsBuilder;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshSuperUserGroupsConfigurationRequestProtoOrBuilder;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshSuperUserGroupsConfigurationRequestProto;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
|
||||
|
||||
|
@ -27,18 +29,20 @@ import org.apache.hadoop.thirdparty.protobuf.TextFormat;
|
|||
|
||||
@Private
|
||||
@Unstable
|
||||
public class RefreshSuperUserGroupsConfigurationRequestPBImpl
|
||||
extends RefreshSuperUserGroupsConfigurationRequest {
|
||||
public class RefreshSuperUserGroupsConfigurationRequestPBImpl
|
||||
extends RefreshSuperUserGroupsConfigurationRequest {
|
||||
|
||||
RefreshSuperUserGroupsConfigurationRequestProto proto = RefreshSuperUserGroupsConfigurationRequestProto.getDefaultInstance();
|
||||
RefreshSuperUserGroupsConfigurationRequestProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
private RefreshSuperUserGroupsConfigurationRequestProto proto =
|
||||
RefreshSuperUserGroupsConfigurationRequestProto.getDefaultInstance();
|
||||
private RefreshSuperUserGroupsConfigurationRequestProto.Builder builder = null;
|
||||
private boolean viaProto = false;
|
||||
|
||||
public RefreshSuperUserGroupsConfigurationRequestPBImpl() {
|
||||
builder = RefreshSuperUserGroupsConfigurationRequestProto.newBuilder();
|
||||
}
|
||||
|
||||
public RefreshSuperUserGroupsConfigurationRequestPBImpl(RefreshSuperUserGroupsConfigurationRequestProto proto) {
|
||||
public RefreshSuperUserGroupsConfigurationRequestPBImpl(
|
||||
RefreshSuperUserGroupsConfigurationRequestProto proto) {
|
||||
this.proto = proto;
|
||||
viaProto = true;
|
||||
}
|
||||
|
@ -56,16 +60,46 @@ extends RefreshSuperUserGroupsConfigurationRequest {
|
|||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (other == null)
|
||||
|
||||
if (!(other instanceof RefreshSuperUserGroupsConfigurationRequest)) {
|
||||
return false;
|
||||
if (other.getClass().isAssignableFrom(this.getClass())) {
|
||||
return this.getProto().equals(this.getClass().cast(other).getProto());
|
||||
}
|
||||
return false;
|
||||
|
||||
RefreshSuperUserGroupsConfigurationRequestPBImpl otherImpl = this.getClass().cast(other);
|
||||
return new EqualsBuilder()
|
||||
.append(this.getProto(), otherImpl.getProto())
|
||||
.isEquals();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return TextFormat.shortDebugString(getProto());
|
||||
}
|
||||
|
||||
private synchronized void maybeInitBuilder() {
|
||||
if (viaProto || builder == null) {
|
||||
builder = RefreshSuperUserGroupsConfigurationRequestProto.newBuilder(proto);
|
||||
}
|
||||
viaProto = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getSubClusterId() {
|
||||
RefreshSuperUserGroupsConfigurationRequestProtoOrBuilder p = viaProto ? proto : builder;
|
||||
boolean hasSubClusterId = p.hasSubClusterId();
|
||||
if (hasSubClusterId) {
|
||||
return p.getSubClusterId();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setSubClusterId(String subClusterId) {
|
||||
maybeInitBuilder();
|
||||
if (subClusterId == null) {
|
||||
builder.clearSubClusterId();
|
||||
return;
|
||||
}
|
||||
builder.setSubClusterId(subClusterId);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,8 +18,10 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
|
||||
|
||||
import org.apache.commons.lang3.builder.EqualsBuilder;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshUserToGroupsMappingsRequestProtoOrBuilder;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshUserToGroupsMappingsRequestProto;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
|
||||
|
||||
|
@ -27,12 +29,12 @@ import org.apache.hadoop.thirdparty.protobuf.TextFormat;
|
|||
|
||||
@Private
|
||||
@Unstable
|
||||
public class RefreshUserToGroupsMappingsRequestPBImpl
|
||||
extends RefreshUserToGroupsMappingsRequest {
|
||||
public class RefreshUserToGroupsMappingsRequestPBImpl extends RefreshUserToGroupsMappingsRequest {
|
||||
|
||||
RefreshUserToGroupsMappingsRequestProto proto = RefreshUserToGroupsMappingsRequestProto.getDefaultInstance();
|
||||
RefreshUserToGroupsMappingsRequestProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
private RefreshUserToGroupsMappingsRequestProto proto =
|
||||
RefreshUserToGroupsMappingsRequestProto.getDefaultInstance();
|
||||
private RefreshUserToGroupsMappingsRequestProto.Builder builder = null;
|
||||
private boolean viaProto = false;
|
||||
|
||||
public RefreshUserToGroupsMappingsRequestPBImpl() {
|
||||
builder = RefreshUserToGroupsMappingsRequestProto.newBuilder();
|
||||
|
@ -56,16 +58,46 @@ extends RefreshUserToGroupsMappingsRequest {
|
|||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (other == null)
|
||||
|
||||
if (!(other instanceof RefreshUserToGroupsMappingsRequest)) {
|
||||
return false;
|
||||
if (other.getClass().isAssignableFrom(this.getClass())) {
|
||||
return this.getProto().equals(this.getClass().cast(other).getProto());
|
||||
}
|
||||
return false;
|
||||
|
||||
RefreshUserToGroupsMappingsRequestPBImpl otherImpl = this.getClass().cast(other);
|
||||
return new EqualsBuilder()
|
||||
.append(this.getProto(), otherImpl.getProto())
|
||||
.isEquals();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return TextFormat.shortDebugString(getProto());
|
||||
}
|
||||
|
||||
private synchronized void maybeInitBuilder() {
|
||||
if (viaProto || builder == null) {
|
||||
builder = RefreshUserToGroupsMappingsRequestProto.newBuilder(proto);
|
||||
}
|
||||
viaProto = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getSubClusterId() {
|
||||
RefreshUserToGroupsMappingsRequestProtoOrBuilder p = viaProto ? proto : builder;
|
||||
boolean hasSubClusterId = p.hasSubClusterId();
|
||||
if (hasSubClusterId) {
|
||||
return p.getSubClusterId();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setSubClusterId(String subClusterId) {
|
||||
maybeInitBuilder();
|
||||
if (subClusterId == null) {
|
||||
builder.clearSubClusterId();
|
||||
return;
|
||||
}
|
||||
builder.setSubClusterId(subClusterId);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -135,6 +135,10 @@ public final class RouterMetrics {
|
|||
private MutableGaugeInt numRenewDelegationTokenFailedRetrieved;
|
||||
@Metric("# of renewDelegationToken failed to be retrieved")
|
||||
private MutableGaugeInt numCancelDelegationTokenFailedRetrieved;
|
||||
@Metric("# of refreshSuperUserGroupsConfiguration failed to be retrieved")
|
||||
private MutableGaugeInt numRefreshSuperUserGroupsConfigurationFailedRetrieved;
|
||||
@Metric("# of refreshUserToGroupsMappings failed to be retrieved")
|
||||
private MutableGaugeInt numRefreshUserToGroupsMappingsFailedRetrieved;
|
||||
|
||||
// Aggregate metrics are shared, and don't have to be looked up per call
|
||||
@Metric("Total number of successful Submitted apps and latency(ms)")
|
||||
|
@ -231,6 +235,10 @@ public final class RouterMetrics {
|
|||
private MutableRate totalSucceededRenewDelegationTokenRetrieved;
|
||||
@Metric("Total number of successful Retrieved CancelDelegationToken and latency(ms)")
|
||||
private MutableRate totalSucceededCancelDelegationTokenRetrieved;
|
||||
@Metric("Total number of successful Retrieved RefreshSuperUserGroupsConfig and latency(ms)")
|
||||
private MutableRate totalSucceededRefreshSuperUserGroupsConfigurationRetrieved;
|
||||
@Metric("Total number of successful Retrieved RefreshUserToGroupsMappings and latency(ms)")
|
||||
private MutableRate totalSucceededRefreshUserToGroupsMappingsRetrieved;
|
||||
|
||||
/**
|
||||
* Provide quantile counters for all latencies.
|
||||
|
@ -282,6 +290,8 @@ public final class RouterMetrics {
|
|||
private MutableQuantiles getDelegationTokenLatency;
|
||||
private MutableQuantiles renewDelegationTokenLatency;
|
||||
private MutableQuantiles cancelDelegationTokenLatency;
|
||||
private MutableQuantiles refreshSuperUserGroupsConfLatency;
|
||||
private MutableQuantiles refreshUserToGroupsMappingsLatency;
|
||||
|
||||
private static volatile RouterMetrics instance = null;
|
||||
private static MetricsRegistry registry;
|
||||
|
@ -456,6 +466,11 @@ public final class RouterMetrics {
|
|||
cancelDelegationTokenLatency = registry.newQuantiles("cancelDelegationTokenLatency",
|
||||
"latency of cancel delegation token timeouts", "ops", "latency", 10);
|
||||
|
||||
refreshSuperUserGroupsConfLatency = registry.newQuantiles("refreshSuperUserGroupsConfLatency",
|
||||
"latency of refresh superuser groups configuration timeouts", "ops", "latency", 10);
|
||||
|
||||
refreshUserToGroupsMappingsLatency = registry.newQuantiles("refreshUserToGroupsMappingsLatency",
|
||||
"latency of refresh user to groups mappings timeouts", "ops", "latency", 10);
|
||||
}
|
||||
|
||||
public static RouterMetrics getMetrics() {
|
||||
|
@ -712,6 +727,11 @@ public final class RouterMetrics {
|
|||
return totalSucceededCancelDelegationTokenRetrieved.lastStat().numSamples();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public long getNumSucceededRefreshSuperUserGroupsConfigurationRetrieved() {
|
||||
return totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.lastStat().numSamples();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public double getLatencySucceededAppsCreated() {
|
||||
return totalSucceededAppsCreated.lastStat().mean();
|
||||
|
@ -947,6 +967,11 @@ public final class RouterMetrics {
|
|||
return totalSucceededCancelDelegationTokenRetrieved.lastStat().mean();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public double getLatencySucceededRefreshSuperUserGroupsConfigurationRetrieved() {
|
||||
return totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.lastStat().mean();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public int getAppsFailedCreated() {
|
||||
return numAppsFailedCreated.value();
|
||||
|
@ -1145,6 +1170,14 @@ public final class RouterMetrics {
|
|||
return numRefreshNodesFailedRetrieved.value();
|
||||
}
|
||||
|
||||
public int getNumRefreshSuperUserGroupsConfigurationFailedRetrieved() {
|
||||
return numRefreshSuperUserGroupsConfigurationFailedRetrieved.value();
|
||||
}
|
||||
|
||||
public int getNumRefreshUserToGroupsMappingsFailedRetrieved() {
|
||||
return numRefreshUserToGroupsMappingsFailedRetrieved.value();
|
||||
}
|
||||
|
||||
public int getDelegationTokenFailedRetrieved() {
|
||||
return numGetDelegationTokenFailedRetrieved.value();
|
||||
}
|
||||
|
@ -1392,6 +1425,16 @@ public final class RouterMetrics {
|
|||
cancelDelegationTokenLatency.add(duration);
|
||||
}
|
||||
|
||||
public void succeededRefreshSuperUserGroupsConfRetrieved(long duration) {
|
||||
totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.add(duration);
|
||||
refreshSuperUserGroupsConfLatency.add(duration);
|
||||
}
|
||||
|
||||
public void succeededRefreshUserToGroupsMappingsRetrieved(long duration) {
|
||||
totalSucceededRefreshUserToGroupsMappingsRetrieved.add(duration);
|
||||
refreshUserToGroupsMappingsLatency.add(duration);
|
||||
}
|
||||
|
||||
public void incrAppsFailedCreated() {
|
||||
numAppsFailedCreated.incr();
|
||||
}
|
||||
|
@ -1568,6 +1611,14 @@ public final class RouterMetrics {
|
|||
numRefreshNodesFailedRetrieved.incr();
|
||||
}
|
||||
|
||||
public void incrRefreshSuperUserGroupsConfigurationFailedRetrieved() {
|
||||
numRefreshSuperUserGroupsConfigurationFailedRetrieved.incr();
|
||||
}
|
||||
|
||||
public void incrRefreshUserToGroupsMappingsFailedRetrieved() {
|
||||
numRefreshUserToGroupsMappingsFailedRetrieved.incr();
|
||||
}
|
||||
|
||||
public void incrGetDelegationTokenFailedRetrieved() {
|
||||
numGetDelegationTokenFailedRetrieved.incr();
|
||||
}
|
||||
|
|
|
@ -192,7 +192,8 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep
|
|||
}
|
||||
} catch (YarnException e) {
|
||||
routerMetrics.incrRefreshQueuesFailedRetrieved();
|
||||
RouterServerUtil.logAndThrowException(e, "Unable to refreshQueue due to exception.");
|
||||
RouterServerUtil.logAndThrowException(e,
|
||||
"Unable to refreshQueue due to exception. " + e.getMessage());
|
||||
}
|
||||
|
||||
routerMetrics.incrRefreshQueuesFailedRetrieved();
|
||||
|
@ -245,25 +246,127 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep
|
|||
}
|
||||
} catch (YarnException e) {
|
||||
routerMetrics.incrRefreshNodesFailedRetrieved();
|
||||
RouterServerUtil.logAndThrowException(e, "Unable to refreshNodes due to exception.");
|
||||
RouterServerUtil.logAndThrowException(e,
|
||||
"Unable to refreshNodes due to exception. " + e.getMessage());
|
||||
}
|
||||
|
||||
routerMetrics.incrRefreshNodesFailedRetrieved();
|
||||
throw new YarnException("Unable to refreshNodes.");
|
||||
throw new YarnException("Unable to refreshNodes due to exception.");
|
||||
}
|
||||
|
||||
/**
|
||||
* Refresh SuperUserGroupsConfiguration requests.
|
||||
*
|
||||
* The Router supports refreshing all subCluster SuperUserGroupsConfiguration at once,
|
||||
* and also supports refreshing SuperUserGroupsConfiguration by SubCluster.
|
||||
*
|
||||
* @param request RefreshSuperUserGroupsConfigurationRequest,
|
||||
* If subClusterId is not empty, it means that we want to
|
||||
* refresh the superuser groups configuration of the specified subClusterId.
|
||||
* If subClusterId is empty, it means we want to
|
||||
* refresh all subCluster superuser groups configuration.
|
||||
*
|
||||
* @return RefreshSuperUserGroupsConfigurationResponse,
|
||||
* There is no specific information in the response, as long as it is not empty,
|
||||
* it means that the request is successful.
|
||||
*
|
||||
* @throws StandbyException exception thrown by non-active server.
|
||||
* @throws YarnException indicates exceptions from yarn servers.
|
||||
* @throws IOException io error occurs.
|
||||
*/
|
||||
@Override
|
||||
public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
|
||||
RefreshSuperUserGroupsConfigurationRequest request)
|
||||
throws StandbyException, YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
|
||||
// parameter verification.
|
||||
if (request == null) {
|
||||
routerMetrics.incrRefreshSuperUserGroupsConfigurationFailedRetrieved();
|
||||
RouterServerUtil.logAndThrowException("Missing RefreshSuperUserGroupsConfiguration request.",
|
||||
null);
|
||||
}
|
||||
|
||||
// call refreshSuperUserGroupsConfiguration of activeSubClusters.
|
||||
try {
|
||||
long startTime = clock.getTime();
|
||||
RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod(
|
||||
new Class[] {RefreshSuperUserGroupsConfigurationRequest.class}, new Object[] {request});
|
||||
|
||||
String subClusterId = request.getSubClusterId();
|
||||
Collection<RefreshSuperUserGroupsConfigurationResponse> refreshSuperUserGroupsConfResps =
|
||||
remoteMethod.invokeConcurrent(this, RefreshSuperUserGroupsConfigurationResponse.class,
|
||||
subClusterId);
|
||||
|
||||
if (CollectionUtils.isNotEmpty(refreshSuperUserGroupsConfResps)) {
|
||||
long stopTime = clock.getTime();
|
||||
routerMetrics.succeededRefreshSuperUserGroupsConfRetrieved(stopTime - startTime);
|
||||
return RefreshSuperUserGroupsConfigurationResponse.newInstance();
|
||||
}
|
||||
} catch (YarnException e) {
|
||||
routerMetrics.incrRefreshSuperUserGroupsConfigurationFailedRetrieved();
|
||||
RouterServerUtil.logAndThrowException(e,
|
||||
"Unable to refreshSuperUserGroupsConfiguration due to exception. " + e.getMessage());
|
||||
}
|
||||
|
||||
routerMetrics.incrRefreshSuperUserGroupsConfigurationFailedRetrieved();
|
||||
throw new YarnException("Unable to refreshSuperUserGroupsConfiguration.");
|
||||
}
|
||||
|
||||
/**
|
||||
* Refresh UserToGroupsMappings requests.
|
||||
*
|
||||
* The Router supports refreshing all subCluster UserToGroupsMappings at once,
|
||||
* and also supports refreshing UserToGroupsMappings by subCluster.
|
||||
*
|
||||
* @param request RefreshUserToGroupsMappingsRequest,
|
||||
* If subClusterId is not empty, it means that we want to
|
||||
* refresh the user groups mapping of the specified subClusterId.
|
||||
* If subClusterId is empty, it means we want to
|
||||
* refresh all subCluster user groups mapping.
|
||||
*
|
||||
* @return RefreshUserToGroupsMappingsResponse,
|
||||
* There is no specific information in the response, as long as it is not empty,
|
||||
* it means that the request is successful.
|
||||
*
|
||||
* @throws StandbyException exception thrown by non-active server.
|
||||
* @throws YarnException indicates exceptions from yarn servers.
|
||||
* @throws IOException io error occurs.
|
||||
*/
|
||||
@Override
|
||||
public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
|
||||
RefreshUserToGroupsMappingsRequest request)
|
||||
throws StandbyException, YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
RefreshUserToGroupsMappingsRequest request) throws StandbyException, YarnException,
|
||||
IOException {
|
||||
|
||||
// parameter verification.
|
||||
if (request == null) {
|
||||
routerMetrics.incrRefreshUserToGroupsMappingsFailedRetrieved();
|
||||
RouterServerUtil.logAndThrowException("Missing RefreshUserToGroupsMappings request.", null);
|
||||
}
|
||||
|
||||
// call refreshUserToGroupsMappings of activeSubClusters.
|
||||
try {
|
||||
long startTime = clock.getTime();
|
||||
RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod(
|
||||
new Class[] {RefreshUserToGroupsMappingsRequest.class}, new Object[] {request});
|
||||
|
||||
String subClusterId = request.getSubClusterId();
|
||||
Collection<RefreshUserToGroupsMappingsResponse> refreshUserToGroupsMappingsResps =
|
||||
remoteMethod.invokeConcurrent(this, RefreshUserToGroupsMappingsResponse.class,
|
||||
subClusterId);
|
||||
|
||||
if (CollectionUtils.isNotEmpty(refreshUserToGroupsMappingsResps)) {
|
||||
long stopTime = clock.getTime();
|
||||
routerMetrics.succeededRefreshUserToGroupsMappingsRetrieved(stopTime - startTime);
|
||||
return RefreshUserToGroupsMappingsResponse.newInstance();
|
||||
}
|
||||
} catch (YarnException e) {
|
||||
routerMetrics.incrRefreshUserToGroupsMappingsFailedRetrieved();
|
||||
RouterServerUtil.logAndThrowException(e,
|
||||
"Unable to refreshUserToGroupsMappings due to exception. " + e.getMessage());
|
||||
}
|
||||
|
||||
routerMetrics.incrRefreshUserToGroupsMappingsFailedRetrieved();
|
||||
throw new YarnException("Unable to refreshUserToGroupsMappings.");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -25,6 +25,11 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
|
||||
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
||||
|
@ -37,6 +42,8 @@ import org.slf4j.LoggerFactory;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
/**
|
||||
* Extends the FederationRMAdminInterceptor and overrides methods to provide a
|
||||
* testable implementation of FederationRMAdminInterceptor.
|
||||
|
@ -128,7 +135,8 @@ public class TestFederationRMAdminInterceptor extends BaseRouterRMAdminTest {
|
|||
|
||||
// normal request.
|
||||
RefreshQueuesRequest request = RefreshQueuesRequest.newInstance();
|
||||
interceptor.refreshQueues(request);
|
||||
RefreshQueuesResponse response = interceptor.refreshQueues(request);
|
||||
assertNotNull(response);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -182,4 +190,73 @@ public class TestFederationRMAdminInterceptor extends BaseRouterRMAdminTest {
|
|||
"subClusterId = SC-NON is not an active subCluster.",
|
||||
() -> interceptor.refreshNodes(request1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshSuperUserGroupsConfiguration() throws Exception {
|
||||
// null request.
|
||||
LambdaTestUtils.intercept(YarnException.class,
|
||||
"Missing RefreshSuperUserGroupsConfiguration request.",
|
||||
() -> interceptor.refreshSuperUserGroupsConfiguration(null));
|
||||
|
||||
// normal request.
|
||||
// There is no return information defined in RefreshSuperUserGroupsConfigurationResponse,
|
||||
// as long as it is not empty, it means that the command is successfully executed.
|
||||
RefreshSuperUserGroupsConfigurationRequest request =
|
||||
RefreshSuperUserGroupsConfigurationRequest.newInstance();
|
||||
RefreshSuperUserGroupsConfigurationResponse response =
|
||||
interceptor.refreshSuperUserGroupsConfiguration(request);
|
||||
assertNotNull(response);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSC1RefreshSuperUserGroupsConfiguration() throws Exception {
|
||||
|
||||
// case 1, test the existing subCluster (SC-1).
|
||||
String existSubCluster = "SC-1";
|
||||
RefreshSuperUserGroupsConfigurationRequest request =
|
||||
RefreshSuperUserGroupsConfigurationRequest.newInstance(existSubCluster);
|
||||
RefreshSuperUserGroupsConfigurationResponse response =
|
||||
interceptor.refreshSuperUserGroupsConfiguration(request);
|
||||
assertNotNull(response);
|
||||
|
||||
// case 2, test the non-exist subCluster.
|
||||
String notExistsSubCluster = "SC-NON";
|
||||
RefreshSuperUserGroupsConfigurationRequest request1 =
|
||||
RefreshSuperUserGroupsConfigurationRequest.newInstance(notExistsSubCluster);
|
||||
LambdaTestUtils.intercept(Exception.class,
|
||||
"subClusterId = SC-NON is not an active subCluster.",
|
||||
() -> interceptor.refreshSuperUserGroupsConfiguration(request1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshUserToGroupsMappings() throws Exception {
|
||||
// null request.
|
||||
LambdaTestUtils.intercept(YarnException.class,
|
||||
"Missing RefreshUserToGroupsMappings request.",
|
||||
() -> interceptor.refreshUserToGroupsMappings(null));
|
||||
|
||||
// normal request.
|
||||
RefreshUserToGroupsMappingsRequest request = RefreshUserToGroupsMappingsRequest.newInstance();
|
||||
RefreshUserToGroupsMappingsResponse response = interceptor.refreshUserToGroupsMappings(request);
|
||||
assertNotNull(response);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSC1RefreshUserToGroupsMappings() throws Exception {
|
||||
// case 1, test the existing subCluster (SC-1).
|
||||
String existSubCluster = "SC-1";
|
||||
RefreshUserToGroupsMappingsRequest request =
|
||||
RefreshUserToGroupsMappingsRequest.newInstance(existSubCluster);
|
||||
RefreshUserToGroupsMappingsResponse response =
|
||||
interceptor.refreshUserToGroupsMappings(request);
|
||||
assertNotNull(response);
|
||||
|
||||
// case 2, test the non-exist subCluster.
|
||||
String notExistsSubCluster = "SC-NON";
|
||||
RefreshUserToGroupsMappingsRequest request1 =
|
||||
RefreshUserToGroupsMappingsRequest.newInstance(notExistsSubCluster);
|
||||
LambdaTestUtils.intercept(Exception.class,
|
||||
"subClusterId = SC-NON is not an active subCluster.",
|
||||
() -> interceptor.refreshUserToGroupsMappings(request1));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue