YARN-5413. Create a proxy chain for ResourceManager Admin API in the Router. (Giovanni Matteo Fumarola via Subru).
(cherry picked from commit 67846a5519
)
This commit is contained in:
parent
80e1904000
commit
7444406d6d
|
@ -2639,6 +2639,8 @@ public class YarnConfiguration extends Configuration {
|
|||
|
||||
public static final String ROUTER_PREFIX = YARN_PREFIX + "router.";
|
||||
|
||||
public static final String ROUTER_BIND_HOST = ROUTER_PREFIX + "bind-host";
|
||||
|
||||
public static final String ROUTER_CLIENTRM_PREFIX =
|
||||
ROUTER_PREFIX + "clientrm.";
|
||||
|
||||
|
@ -2654,9 +2656,23 @@ public class YarnConfiguration extends Configuration {
|
|||
"org.apache.hadoop.yarn.server.router.clientrm."
|
||||
+ "DefaultClientRequestInterceptor";
|
||||
|
||||
public static final String ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE =
|
||||
ROUTER_CLIENTRM_PREFIX + "cache-max-size";
|
||||
public static final int DEFAULT_ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE = 25;
|
||||
public static final String ROUTER_PIPELINE_CACHE_MAX_SIZE =
|
||||
ROUTER_PREFIX + "pipeline.cache-max-size";
|
||||
public static final int DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE = 25;
|
||||
|
||||
public static final String ROUTER_RMADMIN_PREFIX = ROUTER_PREFIX + "rmadmin.";
|
||||
|
||||
public static final String ROUTER_RMADMIN_ADDRESS =
|
||||
ROUTER_RMADMIN_PREFIX + ".address";
|
||||
public static final int DEFAULT_ROUTER_RMADMIN_PORT = 8052;
|
||||
public static final String DEFAULT_ROUTER_RMADMIN_ADDRESS =
|
||||
"0.0.0.0:" + DEFAULT_ROUTER_RMADMIN_PORT;
|
||||
|
||||
public static final String ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE =
|
||||
ROUTER_RMADMIN_PREFIX + "interceptor-class.pipeline";
|
||||
public static final String DEFAULT_ROUTER_RMADMIN_INTERCEPTOR_CLASS =
|
||||
"org.apache.hadoop.yarn.server.router.rmadmin."
|
||||
+ "DefaultRMAdminRequestInterceptor";
|
||||
|
||||
////////////////////////////////
|
||||
// Other Configs
|
||||
|
|
|
@ -3177,12 +3177,33 @@
|
|||
|
||||
<property>
|
||||
<description>
|
||||
Size of LRU cache for Router ClientRM Service.
|
||||
Size of LRU cache for Router ClientRM Service and RMAdmin Service.
|
||||
</description>
|
||||
<name>yarn.router.clientrm.cache-max-size</name>
|
||||
<name>yarn.router.pipeline.cache-max-size</name>
|
||||
<value>25</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
The comma separated list of class names that implement the
|
||||
RequestInterceptor interface. This is used by the RouterRMAdminService
|
||||
to create the request processing pipeline for users.
|
||||
</description>
|
||||
<name>yarn.router.rmadmin.interceptor-class.pipeline</name>
|
||||
<value>org.apache.hadoop.yarn.server.router.rmadmin.DefaultRMAdminRequestInterceptor</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
The actual address the server will bind to. If this optional address is
|
||||
set, the RPC and webapp servers will bind to this address and the port specified in
|
||||
yarn.router.address and yarn.router.webapp.address, respectively. This is
|
||||
most useful for making Router listen to all interfaces by setting to 0.0.0.0.
|
||||
</description>
|
||||
<name>yarn.router.bind-host</name>
|
||||
<value></value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
Comma-separated list of PlacementRules to determine how applications
|
||||
|
|
|
@ -24,7 +24,7 @@ import org.junit.Assert;
|
|||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Test class to validate the correctness of the LRUCacheHashMap.
|
||||
* Test class to validate the correctness of the {@code LRUCacheHashMap}.
|
||||
*
|
||||
*/
|
||||
public class TestLRUCacheHashMap {
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.util.Set;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
|
@ -117,6 +118,33 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
|||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.Assert;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -130,8 +158,8 @@ import com.google.common.base.Strings;
|
|||
* implementation is expected by the Router/AMRMProxy unit test cases. So please
|
||||
* change the implementation with care.
|
||||
*/
|
||||
public class MockResourceManagerFacade
|
||||
implements ApplicationClientProtocol, ApplicationMasterProtocol {
|
||||
public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
||||
ApplicationMasterProtocol, ResourceManagerAdministrationProtocol {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(MockResourceManagerFacade.class);
|
||||
|
@ -508,4 +536,92 @@ public class MockResourceManagerFacade
|
|||
throws YarnException, IOException {
|
||||
return UpdateApplicationTimeoutsResponse.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
|
||||
throws StandbyException, YarnException, IOException {
|
||||
return RefreshQueuesResponse.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
|
||||
throws StandbyException, YarnException, IOException {
|
||||
return RefreshNodesResponse.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
|
||||
RefreshSuperUserGroupsConfigurationRequest request)
|
||||
throws StandbyException, YarnException, IOException {
|
||||
return RefreshSuperUserGroupsConfigurationResponse.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
|
||||
RefreshUserToGroupsMappingsRequest request)
|
||||
throws StandbyException, YarnException, IOException {
|
||||
return RefreshUserToGroupsMappingsResponse.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshAdminAclsResponse refreshAdminAcls(
|
||||
RefreshAdminAclsRequest request) throws YarnException, IOException {
|
||||
return RefreshAdminAclsResponse.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshServiceAclsResponse refreshServiceAcls(
|
||||
RefreshServiceAclsRequest request) throws YarnException, IOException {
|
||||
return RefreshServiceAclsResponse.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateNodeResourceResponse updateNodeResource(
|
||||
UpdateNodeResourceRequest request) throws YarnException, IOException {
|
||||
return UpdateNodeResourceResponse.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshNodesResourcesResponse refreshNodesResources(
|
||||
RefreshNodesResourcesRequest request) throws YarnException, IOException {
|
||||
return RefreshNodesResourcesResponse.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public AddToClusterNodeLabelsResponse addToClusterNodeLabels(
|
||||
AddToClusterNodeLabelsRequest request) throws YarnException, IOException {
|
||||
return AddToClusterNodeLabelsResponse.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
|
||||
RemoveFromClusterNodeLabelsRequest request)
|
||||
throws YarnException, IOException {
|
||||
return RemoveFromClusterNodeLabelsResponse.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
|
||||
ReplaceLabelsOnNodeRequest request) throws YarnException, IOException {
|
||||
return ReplaceLabelsOnNodeResponse.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
|
||||
CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)
|
||||
throws YarnException, IOException {
|
||||
return CheckForDecommissioningNodesResponse.newInstance(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
|
||||
RefreshClusterMaxPriorityRequest request)
|
||||
throws YarnException, IOException {
|
||||
return RefreshClusterMaxPriorityResponse.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] getGroupsForUser(String user) throws IOException {
|
||||
return new String[0];
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
|
|||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
|
||||
import org.apache.hadoop.yarn.server.router.rmadmin.RouterRMAdminService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -54,6 +55,7 @@ public class Router extends CompositeService {
|
|||
private Configuration conf;
|
||||
private AtomicBoolean isStopping = new AtomicBoolean(false);
|
||||
private RouterClientRMService clientRMProxyService;
|
||||
private RouterRMAdminService rmAdminProxyService;
|
||||
|
||||
/**
|
||||
* Priority of the Router shutdown hook.
|
||||
|
@ -71,8 +73,12 @@ public class Router extends CompositeService {
|
|||
@Override
|
||||
protected void serviceInit(Configuration config) throws Exception {
|
||||
this.conf = config;
|
||||
// ClientRM Proxy
|
||||
clientRMProxyService = createClientRMProxyService();
|
||||
addService(clientRMProxyService);
|
||||
// RMAdmin Proxy
|
||||
rmAdminProxyService = createRMAdminProxyService();
|
||||
addService(rmAdminProxyService);
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
|
@ -107,6 +113,10 @@ public class Router extends CompositeService {
|
|||
return new RouterClientRMService();
|
||||
}
|
||||
|
||||
protected RouterRMAdminService createRMAdminProxyService() {
|
||||
return new RouterRMAdminService();
|
||||
}
|
||||
|
||||
public static void main(String[] argv) {
|
||||
Configuration conf = new YarnConfiguration();
|
||||
Thread
|
||||
|
|
|
@ -21,8 +21,9 @@ package org.apache.hadoop.yarn.server.router.clientrm;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
/**
|
||||
* Implements the RequestInterceptor interface and provides common functionality
|
||||
* which can can be used and/or extended by other concrete intercepter classes.
|
||||
* Implements the {@link ClientRequestInterceptor} interface and provides common
|
||||
* functionality which can can be used and/or extended by other concrete
|
||||
* intercepter classes.
|
||||
*
|
||||
*/
|
||||
public abstract class AbstractClientRequestInterceptor
|
||||
|
@ -31,7 +32,7 @@ public abstract class AbstractClientRequestInterceptor
|
|||
private ClientRequestInterceptor nextInterceptor;
|
||||
|
||||
/**
|
||||
* Sets the {@code RequestInterceptor} in the chain.
|
||||
* Sets the {@link ClientRequestInterceptor} in the chain.
|
||||
*/
|
||||
@Override
|
||||
public void setNextInterceptor(ClientRequestInterceptor nextInterceptor) {
|
||||
|
@ -59,7 +60,7 @@ public abstract class AbstractClientRequestInterceptor
|
|||
}
|
||||
|
||||
/**
|
||||
* Initializes the {@code ClientRequestInterceptor}.
|
||||
* Initializes the {@link ClientRequestInterceptor}.
|
||||
*/
|
||||
@Override
|
||||
public void init(String user) {
|
||||
|
@ -69,7 +70,7 @@ public abstract class AbstractClientRequestInterceptor
|
|||
}
|
||||
|
||||
/**
|
||||
* Disposes the {@code ClientRequestInterceptor}.
|
||||
* Disposes the {@link ClientRequestInterceptor}.
|
||||
*/
|
||||
@Override
|
||||
public void shutdown() {
|
||||
|
|
|
@ -91,7 +91,7 @@ import org.slf4j.LoggerFactory;
|
|||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* Extends the AbstractRequestInterceptorClient class and provides an
|
||||
* Extends the {@code AbstractRequestInterceptorClient} class and provides an
|
||||
* implementation that simply forwards the client requests to the cluster
|
||||
* resource manager.
|
||||
*
|
||||
|
|
|
@ -104,10 +104,11 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
|
||||
/**
|
||||
* RouterClientRMService is a service that runs on each router that can be used
|
||||
* to intercept and inspect ApplicationClientProtocol messages from client to
|
||||
* the cluster resource manager. It listens ApplicationClientProtocol messages
|
||||
* from the client and creates a request intercepting pipeline instance for each
|
||||
* client. The pipeline is a chain of intercepter instances that can inspect and
|
||||
* to intercept and inspect {@link ApplicationClientProtocol} messages from
|
||||
* client to the cluster resource manager. It listens
|
||||
* {@link ApplicationClientProtocol} messages from the client and creates a
|
||||
* request intercepting pipeline instance for each client. The pipeline is a
|
||||
* chain of {@link ClientRequestInterceptor} instances that can inspect and
|
||||
* modify the request/response as needed. The main difference with
|
||||
* AMRMProxyService is the protocol they implement.
|
||||
*/
|
||||
|
@ -137,13 +138,14 @@ public class RouterClientRMService extends AbstractService
|
|||
UserGroupInformation.setConfiguration(conf);
|
||||
|
||||
this.listenerEndpoint =
|
||||
conf.getSocketAddr(YarnConfiguration.ROUTER_CLIENTRM_ADDRESS,
|
||||
conf.getSocketAddr(YarnConfiguration.ROUTER_BIND_HOST,
|
||||
YarnConfiguration.ROUTER_CLIENTRM_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PORT);
|
||||
|
||||
int maxCacheSize =
|
||||
conf.getInt(YarnConfiguration.ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE,
|
||||
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE);
|
||||
conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
|
||||
YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE);
|
||||
this.userPipelineMap = Collections.synchronizedMap(
|
||||
new LRUCacheHashMap<String, RequestInterceptorChainWrapper>(
|
||||
maxCacheSize, true));
|
||||
|
|
|
@ -0,0 +1,90 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.router.rmadmin;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
/**
|
||||
* Implements the {@link RMAdminRequestInterceptor} interface and provides
|
||||
* common functionality which can can be used and/or extended by other concrete
|
||||
* intercepter classes.
|
||||
*
|
||||
*/
|
||||
public abstract class AbstractRMAdminRequestInterceptor
|
||||
implements RMAdminRequestInterceptor {
|
||||
private Configuration conf;
|
||||
private RMAdminRequestInterceptor nextInterceptor;
|
||||
|
||||
/**
|
||||
* Sets the {@link RMAdminRequestInterceptor} in the chain.
|
||||
*/
|
||||
@Override
|
||||
public void setNextInterceptor(RMAdminRequestInterceptor nextInterceptor) {
|
||||
this.nextInterceptor = nextInterceptor;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the {@link Configuration}.
|
||||
*/
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
this.conf = conf;
|
||||
if (this.nextInterceptor != null) {
|
||||
this.nextInterceptor.setConf(conf);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the {@link Configuration}.
|
||||
*/
|
||||
@Override
|
||||
public Configuration getConf() {
|
||||
return this.conf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes the {@link RMAdminRequestInterceptor}.
|
||||
*/
|
||||
@Override
|
||||
public void init(String user) {
|
||||
if (this.nextInterceptor != null) {
|
||||
this.nextInterceptor.init(user);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Disposes the {@link RMAdminRequestInterceptor}.
|
||||
*/
|
||||
@Override
|
||||
public void shutdown() {
|
||||
if (this.nextInterceptor != null) {
|
||||
this.nextInterceptor.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the next {@link RMAdminRequestInterceptor} in the chain.
|
||||
*/
|
||||
@Override
|
||||
public RMAdminRequestInterceptor getNextInterceptor() {
|
||||
return this.nextInterceptor;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,215 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.router.rmadmin;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* Extends the {@link AbstractRMAdminRequestInterceptor} class and provides an
|
||||
* implementation that simply forwards the client requests to the cluster
|
||||
* resource manager.
|
||||
*
|
||||
*/
|
||||
public class DefaultRMAdminRequestInterceptor
|
||||
extends AbstractRMAdminRequestInterceptor {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(DefaultRMAdminRequestInterceptor.class);
|
||||
private ResourceManagerAdministrationProtocol rmAdminProxy;
|
||||
private UserGroupInformation user = null;
|
||||
|
||||
@Override
|
||||
public void init(String userName) {
|
||||
super.init(userName);
|
||||
try {
|
||||
// Do not create a proxy user if user name matches the user name on
|
||||
// current UGI
|
||||
if (userName.equalsIgnoreCase(
|
||||
UserGroupInformation.getCurrentUser().getUserName())) {
|
||||
user = UserGroupInformation.getCurrentUser();
|
||||
} else {
|
||||
user = UserGroupInformation.createProxyUser(userName,
|
||||
UserGroupInformation.getCurrentUser());
|
||||
}
|
||||
|
||||
final Configuration conf = this.getConf();
|
||||
|
||||
rmAdminProxy = user.doAs(
|
||||
new PrivilegedExceptionAction<ResourceManagerAdministrationProtocol>() {
|
||||
@Override
|
||||
public ResourceManagerAdministrationProtocol run()
|
||||
throws Exception {
|
||||
return ClientRMProxy.createRMProxy(conf,
|
||||
ResourceManagerAdministrationProtocol.class);
|
||||
}
|
||||
});
|
||||
} catch (IOException e) {
|
||||
String message = "Error while creating Router RMAdmin Service for user:";
|
||||
if (user != null) {
|
||||
message += ", user: " + user;
|
||||
}
|
||||
|
||||
LOG.info(message);
|
||||
throw new YarnRuntimeException(message, e);
|
||||
} catch (Exception e) {
|
||||
throw new YarnRuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNextInterceptor(RMAdminRequestInterceptor next) {
|
||||
throw new YarnRuntimeException("setNextInterceptor is being called on "
|
||||
+ "DefaultRMAdminRequestInterceptor, which should be the last one "
|
||||
+ "in the chain. Check if the interceptor pipeline configuration "
|
||||
+ "is correct");
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setRMAdmin(ResourceManagerAdministrationProtocol rmAdmin) {
|
||||
this.rmAdminProxy = rmAdmin;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
|
||||
throws StandbyException, YarnException, IOException {
|
||||
return rmAdminProxy.refreshQueues(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
|
||||
throws StandbyException, YarnException, IOException {
|
||||
return rmAdminProxy.refreshNodes(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
|
||||
RefreshSuperUserGroupsConfigurationRequest request)
|
||||
throws StandbyException, YarnException, IOException {
|
||||
return rmAdminProxy.refreshSuperUserGroupsConfiguration(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
|
||||
RefreshUserToGroupsMappingsRequest request)
|
||||
throws StandbyException, YarnException, IOException {
|
||||
return rmAdminProxy.refreshUserToGroupsMappings(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshAdminAclsResponse refreshAdminAcls(
|
||||
RefreshAdminAclsRequest request) throws YarnException, IOException {
|
||||
return rmAdminProxy.refreshAdminAcls(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshServiceAclsResponse refreshServiceAcls(
|
||||
RefreshServiceAclsRequest request) throws YarnException, IOException {
|
||||
return rmAdminProxy.refreshServiceAcls(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateNodeResourceResponse updateNodeResource(
|
||||
UpdateNodeResourceRequest request) throws YarnException, IOException {
|
||||
return rmAdminProxy.updateNodeResource(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshNodesResourcesResponse refreshNodesResources(
|
||||
RefreshNodesResourcesRequest request) throws YarnException, IOException {
|
||||
return rmAdminProxy.refreshNodesResources(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AddToClusterNodeLabelsResponse addToClusterNodeLabels(
|
||||
AddToClusterNodeLabelsRequest request) throws YarnException, IOException {
|
||||
return rmAdminProxy.addToClusterNodeLabels(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
|
||||
RemoveFromClusterNodeLabelsRequest request)
|
||||
throws YarnException, IOException {
|
||||
return rmAdminProxy.removeFromClusterNodeLabels(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
|
||||
ReplaceLabelsOnNodeRequest request) throws YarnException, IOException {
|
||||
return rmAdminProxy.replaceLabelsOnNode(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
|
||||
CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)
|
||||
throws YarnException, IOException {
|
||||
return rmAdminProxy
|
||||
.checkForDecommissioningNodes(checkForDecommissioningNodesRequest);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
|
||||
RefreshClusterMaxPriorityRequest request)
|
||||
throws YarnException, IOException {
|
||||
return rmAdminProxy.refreshClusterMaxPriority(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] getGroupsForUser(String userName) throws IOException {
|
||||
return rmAdminProxy.getGroupsForUser(userName);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,65 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.router.rmadmin;
|
||||
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
|
||||
|
||||
/**
|
||||
* Defines the contract to be implemented by the request intercepter classes,
|
||||
* that can be used to intercept and inspect messages sent from the client to
|
||||
* the resource manager.
|
||||
*/
|
||||
public interface RMAdminRequestInterceptor
|
||||
extends ResourceManagerAdministrationProtocol, Configurable {
|
||||
/**
|
||||
* This method is called for initializing the intercepter. This is guaranteed
|
||||
* to be called only once in the lifetime of this instance.
|
||||
*
|
||||
* @param user the name of the client
|
||||
*/
|
||||
void init(String user);
|
||||
|
||||
/**
|
||||
* This method is called to release the resources held by the intercepter.
|
||||
* This will be called when the application pipeline is being destroyed. The
|
||||
* concrete implementations should dispose the resources and forward the
|
||||
* request to the next intercepter, if any.
|
||||
*/
|
||||
void shutdown();
|
||||
|
||||
/**
|
||||
* Sets the next intercepter in the pipeline. The concrete implementation of
|
||||
* this interface should always pass the request to the nextInterceptor after
|
||||
* inspecting the message. The last intercepter in the chain is responsible to
|
||||
* send the messages to the resource manager service and so the last
|
||||
* intercepter will not receive this method call.
|
||||
*
|
||||
* @param nextInterceptor the RMAdminRequestInterceptor to set in the pipeline
|
||||
*/
|
||||
void setNextInterceptor(RMAdminRequestInterceptor nextInterceptor);
|
||||
|
||||
/**
|
||||
* Returns the next intercepter in the chain.
|
||||
*
|
||||
* @return the next intercepter in the chain
|
||||
*/
|
||||
RMAdminRequestInterceptor getNextInterceptor();
|
||||
|
||||
}
|
|
@ -0,0 +1,423 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.router.rmadmin;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
||||
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* RouterRMAdminService is a service that runs on each router that can be used
|
||||
* to intercept and inspect {@code ResourceManagerAdministrationProtocol}
|
||||
* messages from client to the cluster resource manager. It listens
|
||||
* {@code ResourceManagerAdministrationProtocol} messages from the client and
|
||||
* creates a request intercepting pipeline instance for each client. The
|
||||
* pipeline is a chain of intercepter instances that can inspect and modify the
|
||||
* request/response as needed. The main difference with AMRMProxyService is the
|
||||
* protocol they implement.
|
||||
*/
|
||||
public class RouterRMAdminService extends AbstractService
|
||||
implements ResourceManagerAdministrationProtocol {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(RouterRMAdminService.class);
|
||||
|
||||
private Server server;
|
||||
private InetSocketAddress listenerEndpoint;
|
||||
|
||||
// For each user we store an interceptors' pipeline.
|
||||
// For performance issue we use LRU cache to keep in memory the newest ones
|
||||
// and remove the oldest used ones.
|
||||
private Map<String, RequestInterceptorChainWrapper> userPipelineMap;
|
||||
|
||||
public RouterRMAdminService() {
|
||||
super(RouterRMAdminService.class.getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
LOG.info("Starting Router RMAdmin Service");
|
||||
Configuration conf = getConfig();
|
||||
YarnRPC rpc = YarnRPC.create(conf);
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
|
||||
this.listenerEndpoint =
|
||||
conf.getSocketAddr(YarnConfiguration.ROUTER_BIND_HOST,
|
||||
YarnConfiguration.ROUTER_RMADMIN_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_ROUTER_RMADMIN_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_ROUTER_RMADMIN_PORT);
|
||||
|
||||
int maxCacheSize =
|
||||
conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
|
||||
YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE);
|
||||
this.userPipelineMap = Collections.synchronizedMap(
|
||||
new LRUCacheHashMap<String, RequestInterceptorChainWrapper>(
|
||||
maxCacheSize, true));
|
||||
|
||||
Configuration serverConf = new Configuration(conf);
|
||||
|
||||
int numWorkerThreads =
|
||||
serverConf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT,
|
||||
YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT);
|
||||
|
||||
this.server = rpc.getServer(ResourceManagerAdministrationProtocol.class,
|
||||
this, listenerEndpoint, serverConf, null, numWorkerThreads);
|
||||
|
||||
this.server.start();
|
||||
LOG.info("Router RMAdminService listening on address: "
|
||||
+ this.server.getListenerAddress());
|
||||
super.serviceStart();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStop() throws Exception {
|
||||
LOG.info("Stopping Router RMAdminService");
|
||||
if (this.server != null) {
|
||||
this.server.stop();
|
||||
}
|
||||
userPipelineMap.clear();
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the comma separated intercepter class names from the configuration.
|
||||
*
|
||||
* @param conf
|
||||
* @return the intercepter class names as an instance of ArrayList
|
||||
*/
|
||||
private List<String> getInterceptorClassNames(Configuration conf) {
|
||||
String configuredInterceptorClassNames =
|
||||
conf.get(YarnConfiguration.ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE,
|
||||
YarnConfiguration.DEFAULT_ROUTER_RMADMIN_INTERCEPTOR_CLASS);
|
||||
|
||||
List<String> interceptorClassNames = new ArrayList<String>();
|
||||
Collection<String> tempList =
|
||||
StringUtils.getStringCollection(configuredInterceptorClassNames);
|
||||
for (String item : tempList) {
|
||||
interceptorClassNames.add(item.trim());
|
||||
}
|
||||
|
||||
return interceptorClassNames;
|
||||
}
|
||||
|
||||
private RequestInterceptorChainWrapper getInterceptorChain()
|
||||
throws IOException {
|
||||
String user = UserGroupInformation.getCurrentUser().getUserName();
|
||||
if (!userPipelineMap.containsKey(user)) {
|
||||
initializePipeline(user);
|
||||
}
|
||||
return userPipelineMap.get(user);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the Request intercepter chains for all the users.
|
||||
*
|
||||
* @return the request intercepter chains.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
protected Map<String, RequestInterceptorChainWrapper> getPipelines() {
|
||||
return this.userPipelineMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method creates and returns reference of the first intercepter in the
|
||||
* chain of request intercepter instances.
|
||||
*
|
||||
* @return the reference of the first intercepter in the chain
|
||||
*/
|
||||
@VisibleForTesting
|
||||
protected RMAdminRequestInterceptor createRequestInterceptorChain() {
|
||||
Configuration conf = getConfig();
|
||||
|
||||
List<String> interceptorClassNames = getInterceptorClassNames(conf);
|
||||
|
||||
RMAdminRequestInterceptor pipeline = null;
|
||||
RMAdminRequestInterceptor current = null;
|
||||
for (String interceptorClassName : interceptorClassNames) {
|
||||
try {
|
||||
Class<?> interceptorClass = conf.getClassByName(interceptorClassName);
|
||||
if (RMAdminRequestInterceptor.class
|
||||
.isAssignableFrom(interceptorClass)) {
|
||||
RMAdminRequestInterceptor interceptorInstance =
|
||||
(RMAdminRequestInterceptor) ReflectionUtils
|
||||
.newInstance(interceptorClass, conf);
|
||||
if (pipeline == null) {
|
||||
pipeline = interceptorInstance;
|
||||
current = interceptorInstance;
|
||||
continue;
|
||||
} else {
|
||||
current.setNextInterceptor(interceptorInstance);
|
||||
current = interceptorInstance;
|
||||
}
|
||||
} else {
|
||||
throw new YarnRuntimeException(
|
||||
"Class: " + interceptorClassName + " not instance of "
|
||||
+ RMAdminRequestInterceptor.class.getCanonicalName());
|
||||
}
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new YarnRuntimeException(
|
||||
"Could not instantiate RMAdminRequestInterceptor: "
|
||||
+ interceptorClassName,
|
||||
e);
|
||||
}
|
||||
}
|
||||
|
||||
if (pipeline == null) {
|
||||
throw new YarnRuntimeException(
|
||||
"RequestInterceptor pipeline is not configured in the system");
|
||||
}
|
||||
return pipeline;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes the request intercepter pipeline for the specified user.
|
||||
*
|
||||
* @param user
|
||||
*/
|
||||
private void initializePipeline(String user) {
|
||||
RequestInterceptorChainWrapper chainWrapper = null;
|
||||
synchronized (this.userPipelineMap) {
|
||||
if (this.userPipelineMap.containsKey(user)) {
|
||||
LOG.info("Request to start an already existing user: {}"
|
||||
+ " was received, so ignoring.", user);
|
||||
return;
|
||||
}
|
||||
|
||||
chainWrapper = new RequestInterceptorChainWrapper();
|
||||
this.userPipelineMap.put(user, chainWrapper);
|
||||
}
|
||||
|
||||
// We register the pipeline instance in the map first and then initialize it
|
||||
// later because chain initialization can be expensive and we would like to
|
||||
// release the lock as soon as possible to prevent other applications from
|
||||
// blocking when one application's chain is initializing
|
||||
LOG.info("Initializing request processing pipeline for the user: {}", user);
|
||||
|
||||
try {
|
||||
RMAdminRequestInterceptor interceptorChain =
|
||||
this.createRequestInterceptorChain();
|
||||
interceptorChain.init(user);
|
||||
chainWrapper.init(interceptorChain);
|
||||
} catch (Exception e) {
|
||||
synchronized (this.userPipelineMap) {
|
||||
this.userPipelineMap.remove(user);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Private structure for encapsulating RequestInterceptor and user instances.
|
||||
*
|
||||
*/
|
||||
@Private
|
||||
public static class RequestInterceptorChainWrapper {
|
||||
private RMAdminRequestInterceptor rootInterceptor;
|
||||
|
||||
/**
|
||||
* Initializes the wrapper with the specified parameters.
|
||||
*
|
||||
* @param interceptor the first interceptor in the pipeline
|
||||
*/
|
||||
public synchronized void init(RMAdminRequestInterceptor interceptor) {
|
||||
this.rootInterceptor = interceptor;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the root request intercepter.
|
||||
*
|
||||
* @return the root request intercepter
|
||||
*/
|
||||
public synchronized RMAdminRequestInterceptor getRootInterceptor() {
|
||||
return rootInterceptor;
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown the chain of interceptors when the object is destroyed.
|
||||
*/
|
||||
@Override
|
||||
protected void finalize() {
|
||||
rootInterceptor.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] getGroupsForUser(String user) throws IOException {
|
||||
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
|
||||
return pipeline.getRootInterceptor().getGroupsForUser(user);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
|
||||
throws StandbyException, YarnException, IOException {
|
||||
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
|
||||
return pipeline.getRootInterceptor().refreshQueues(request);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
|
||||
throws StandbyException, YarnException, IOException {
|
||||
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
|
||||
return pipeline.getRootInterceptor().refreshNodes(request);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
|
||||
RefreshSuperUserGroupsConfigurationRequest request)
|
||||
throws StandbyException, YarnException, IOException {
|
||||
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
|
||||
return pipeline.getRootInterceptor()
|
||||
.refreshSuperUserGroupsConfiguration(request);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
|
||||
RefreshUserToGroupsMappingsRequest request)
|
||||
throws StandbyException, YarnException, IOException {
|
||||
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
|
||||
return pipeline.getRootInterceptor().refreshUserToGroupsMappings(request);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshAdminAclsResponse refreshAdminAcls(
|
||||
RefreshAdminAclsRequest request) throws YarnException, IOException {
|
||||
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
|
||||
return pipeline.getRootInterceptor().refreshAdminAcls(request);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshServiceAclsResponse refreshServiceAcls(
|
||||
RefreshServiceAclsRequest request) throws YarnException, IOException {
|
||||
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
|
||||
return pipeline.getRootInterceptor().refreshServiceAcls(request);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateNodeResourceResponse updateNodeResource(
|
||||
UpdateNodeResourceRequest request) throws YarnException, IOException {
|
||||
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
|
||||
return pipeline.getRootInterceptor().updateNodeResource(request);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshNodesResourcesResponse refreshNodesResources(
|
||||
RefreshNodesResourcesRequest request) throws YarnException, IOException {
|
||||
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
|
||||
return pipeline.getRootInterceptor().refreshNodesResources(request);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public AddToClusterNodeLabelsResponse addToClusterNodeLabels(
|
||||
AddToClusterNodeLabelsRequest request) throws YarnException, IOException {
|
||||
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
|
||||
return pipeline.getRootInterceptor().addToClusterNodeLabels(request);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
|
||||
RemoveFromClusterNodeLabelsRequest request)
|
||||
throws YarnException, IOException {
|
||||
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
|
||||
return pipeline.getRootInterceptor().removeFromClusterNodeLabels(request);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
|
||||
ReplaceLabelsOnNodeRequest request) throws YarnException, IOException {
|
||||
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
|
||||
return pipeline.getRootInterceptor().replaceLabelsOnNode(request);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
|
||||
CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)
|
||||
throws YarnException, IOException {
|
||||
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
|
||||
return pipeline.getRootInterceptor()
|
||||
.checkForDecommissioningNodes(checkForDecommissioningNodesRequest);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
|
||||
RefreshClusterMaxPriorityRequest request)
|
||||
throws YarnException, IOException {
|
||||
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
|
||||
return pipeline.getRootInterceptor().refreshClusterMaxPriority(request);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,20 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/** Router RM Admin Proxy Service package. **/
|
||||
package org.apache.hadoop.yarn.server.router.rmadmin;
|
|
@ -133,7 +133,7 @@ public abstract class BaseRouterClientRMTest {
|
|||
+ "," + mockPassThroughInterceptorClass + ","
|
||||
+ MockClientRequestInterceptor.class.getName());
|
||||
|
||||
this.conf.setInt(YarnConfiguration.ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE,
|
||||
this.conf.setInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
|
||||
TEST_MAX_CACHE_SIZE);
|
||||
|
||||
this.dispatcher = new AsyncDispatcher();
|
||||
|
|
|
@ -0,0 +1,346 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.router.rmadmin;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.HashMap;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
||||
/**
|
||||
* Base class for all the RouterRMAdminService test cases. It provides utility
|
||||
* methods that can be used by the concrete test case classes.
|
||||
*
|
||||
*/
|
||||
public abstract class BaseRouterRMAdminTest {
|
||||
|
||||
/**
|
||||
* The RouterRMAdminService instance that will be used by all the test cases.
|
||||
*/
|
||||
private MockRouterRMAdminService rmAdminService;
|
||||
/**
|
||||
* Thread pool used for asynchronous operations.
|
||||
*/
|
||||
private static ExecutorService threadpool = Executors.newCachedThreadPool();
|
||||
private Configuration conf;
|
||||
private AsyncDispatcher dispatcher;
|
||||
|
||||
public final static int TEST_MAX_CACHE_SIZE = 10;
|
||||
|
||||
protected MockRouterRMAdminService getRouterRMAdminService() {
|
||||
Assert.assertNotNull(this.rmAdminService);
|
||||
return this.rmAdminService;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
this.conf = new YarnConfiguration();
|
||||
String mockPassThroughInterceptorClass =
|
||||
PassThroughRMAdminRequestInterceptor.class.getName();
|
||||
|
||||
// Create a request intercepter pipeline for testing. The last one in the
|
||||
// chain will call the mock resource manager. The others in the chain will
|
||||
// simply forward it to the next one in the chain
|
||||
this.conf.set(YarnConfiguration.ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE,
|
||||
mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
|
||||
+ "," + mockPassThroughInterceptorClass + ","
|
||||
+ MockRMAdminRequestInterceptor.class.getName());
|
||||
|
||||
this.conf.setInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
|
||||
TEST_MAX_CACHE_SIZE);
|
||||
|
||||
this.dispatcher = new AsyncDispatcher();
|
||||
this.dispatcher.init(conf);
|
||||
this.dispatcher.start();
|
||||
this.rmAdminService = createAndStartRouterRMAdminService();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
if (rmAdminService != null) {
|
||||
rmAdminService.stop();
|
||||
rmAdminService = null;
|
||||
}
|
||||
if (this.dispatcher != null) {
|
||||
this.dispatcher.stop();
|
||||
}
|
||||
}
|
||||
|
||||
protected ExecutorService getThreadPool() {
|
||||
return threadpool;
|
||||
}
|
||||
|
||||
protected MockRouterRMAdminService createAndStartRouterRMAdminService() {
|
||||
MockRouterRMAdminService svc = new MockRouterRMAdminService();
|
||||
svc.init(conf);
|
||||
svc.start();
|
||||
return svc;
|
||||
}
|
||||
|
||||
protected static class MockRouterRMAdminService extends RouterRMAdminService {
|
||||
public MockRouterRMAdminService() {
|
||||
super();
|
||||
}
|
||||
}
|
||||
|
||||
protected RefreshQueuesResponse refreshQueues(String user)
|
||||
throws IOException, InterruptedException {
|
||||
return UserGroupInformation.createRemoteUser(user)
|
||||
.doAs(new PrivilegedExceptionAction<RefreshQueuesResponse>() {
|
||||
@Override
|
||||
public RefreshQueuesResponse run() throws Exception {
|
||||
RefreshQueuesRequest req = RefreshQueuesRequest.newInstance();
|
||||
RefreshQueuesResponse response =
|
||||
getRouterRMAdminService().refreshQueues(req);
|
||||
return response;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected RefreshNodesResponse refreshNodes(String user)
|
||||
throws IOException, InterruptedException {
|
||||
return UserGroupInformation.createRemoteUser(user)
|
||||
.doAs(new PrivilegedExceptionAction<RefreshNodesResponse>() {
|
||||
@Override
|
||||
public RefreshNodesResponse run() throws Exception {
|
||||
RefreshNodesRequest req = RefreshNodesRequest.newInstance();
|
||||
RefreshNodesResponse response =
|
||||
getRouterRMAdminService().refreshNodes(req);
|
||||
return response;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
|
||||
String user) throws IOException, InterruptedException {
|
||||
return UserGroupInformation.createRemoteUser(user).doAs(
|
||||
new PrivilegedExceptionAction<RefreshSuperUserGroupsConfigurationResponse>() {
|
||||
@Override
|
||||
public RefreshSuperUserGroupsConfigurationResponse run()
|
||||
throws Exception {
|
||||
RefreshSuperUserGroupsConfigurationRequest req =
|
||||
RefreshSuperUserGroupsConfigurationRequest.newInstance();
|
||||
RefreshSuperUserGroupsConfigurationResponse response =
|
||||
getRouterRMAdminService()
|
||||
.refreshSuperUserGroupsConfiguration(req);
|
||||
return response;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
|
||||
String user) throws IOException, InterruptedException {
|
||||
return UserGroupInformation.createRemoteUser(user).doAs(
|
||||
new PrivilegedExceptionAction<RefreshUserToGroupsMappingsResponse>() {
|
||||
@Override
|
||||
public RefreshUserToGroupsMappingsResponse run() throws Exception {
|
||||
RefreshUserToGroupsMappingsRequest req =
|
||||
RefreshUserToGroupsMappingsRequest.newInstance();
|
||||
RefreshUserToGroupsMappingsResponse response =
|
||||
getRouterRMAdminService().refreshUserToGroupsMappings(req);
|
||||
return response;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected RefreshAdminAclsResponse refreshAdminAcls(String user)
|
||||
throws IOException, InterruptedException {
|
||||
return UserGroupInformation.createRemoteUser(user)
|
||||
.doAs(new PrivilegedExceptionAction<RefreshAdminAclsResponse>() {
|
||||
@Override
|
||||
public RefreshAdminAclsResponse run() throws Exception {
|
||||
RefreshAdminAclsRequest req = RefreshAdminAclsRequest.newInstance();
|
||||
RefreshAdminAclsResponse response =
|
||||
getRouterRMAdminService().refreshAdminAcls(req);
|
||||
return response;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected RefreshServiceAclsResponse refreshServiceAcls(String user)
|
||||
throws IOException, InterruptedException {
|
||||
return UserGroupInformation.createRemoteUser(user)
|
||||
.doAs(new PrivilegedExceptionAction<RefreshServiceAclsResponse>() {
|
||||
@Override
|
||||
public RefreshServiceAclsResponse run() throws Exception {
|
||||
RefreshServiceAclsRequest req =
|
||||
RefreshServiceAclsRequest.newInstance();
|
||||
RefreshServiceAclsResponse response =
|
||||
getRouterRMAdminService().refreshServiceAcls(req);
|
||||
return response;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected UpdateNodeResourceResponse updateNodeResource(String user)
|
||||
throws IOException, InterruptedException {
|
||||
return UserGroupInformation.createRemoteUser(user)
|
||||
.doAs(new PrivilegedExceptionAction<UpdateNodeResourceResponse>() {
|
||||
@Override
|
||||
public UpdateNodeResourceResponse run() throws Exception {
|
||||
UpdateNodeResourceRequest req =
|
||||
UpdateNodeResourceRequest.newInstance(null);
|
||||
UpdateNodeResourceResponse response =
|
||||
getRouterRMAdminService().updateNodeResource(req);
|
||||
return response;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected RefreshNodesResourcesResponse refreshNodesResources(String user)
|
||||
throws IOException, InterruptedException {
|
||||
return UserGroupInformation.createRemoteUser(user)
|
||||
.doAs(new PrivilegedExceptionAction<RefreshNodesResourcesResponse>() {
|
||||
@Override
|
||||
public RefreshNodesResourcesResponse run() throws Exception {
|
||||
RefreshNodesResourcesRequest req =
|
||||
RefreshNodesResourcesRequest.newInstance();
|
||||
RefreshNodesResourcesResponse response =
|
||||
getRouterRMAdminService().refreshNodesResources(req);
|
||||
return response;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected AddToClusterNodeLabelsResponse addToClusterNodeLabels(String user)
|
||||
throws IOException, InterruptedException {
|
||||
return UserGroupInformation.createRemoteUser(user)
|
||||
.doAs(new PrivilegedExceptionAction<AddToClusterNodeLabelsResponse>() {
|
||||
@Override
|
||||
public AddToClusterNodeLabelsResponse run() throws Exception {
|
||||
AddToClusterNodeLabelsRequest req =
|
||||
AddToClusterNodeLabelsRequest.newInstance(null);
|
||||
AddToClusterNodeLabelsResponse response =
|
||||
getRouterRMAdminService().addToClusterNodeLabels(req);
|
||||
return response;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
|
||||
String user) throws IOException, InterruptedException {
|
||||
return UserGroupInformation.createRemoteUser(user).doAs(
|
||||
new PrivilegedExceptionAction<RemoveFromClusterNodeLabelsResponse>() {
|
||||
@Override
|
||||
public RemoveFromClusterNodeLabelsResponse run() throws Exception {
|
||||
RemoveFromClusterNodeLabelsRequest req =
|
||||
RemoveFromClusterNodeLabelsRequest.newInstance(null);
|
||||
RemoveFromClusterNodeLabelsResponse response =
|
||||
getRouterRMAdminService().removeFromClusterNodeLabels(req);
|
||||
return response;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected ReplaceLabelsOnNodeResponse replaceLabelsOnNode(String user)
|
||||
throws IOException, InterruptedException {
|
||||
return UserGroupInformation.createRemoteUser(user)
|
||||
.doAs(new PrivilegedExceptionAction<ReplaceLabelsOnNodeResponse>() {
|
||||
@Override
|
||||
public ReplaceLabelsOnNodeResponse run() throws Exception {
|
||||
ReplaceLabelsOnNodeRequest req = ReplaceLabelsOnNodeRequest
|
||||
.newInstance(new HashMap<NodeId, Set<String>>());
|
||||
ReplaceLabelsOnNodeResponse response =
|
||||
getRouterRMAdminService().replaceLabelsOnNode(req);
|
||||
return response;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
|
||||
String user) throws IOException, InterruptedException {
|
||||
return UserGroupInformation.createRemoteUser(user).doAs(
|
||||
new PrivilegedExceptionAction<CheckForDecommissioningNodesResponse>() {
|
||||
@Override
|
||||
public CheckForDecommissioningNodesResponse run() throws Exception {
|
||||
CheckForDecommissioningNodesRequest req =
|
||||
CheckForDecommissioningNodesRequest.newInstance();
|
||||
CheckForDecommissioningNodesResponse response =
|
||||
getRouterRMAdminService().checkForDecommissioningNodes(req);
|
||||
return response;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
|
||||
String user) throws IOException, InterruptedException {
|
||||
return UserGroupInformation.createRemoteUser(user).doAs(
|
||||
new PrivilegedExceptionAction<RefreshClusterMaxPriorityResponse>() {
|
||||
@Override
|
||||
public RefreshClusterMaxPriorityResponse run() throws Exception {
|
||||
RefreshClusterMaxPriorityRequest req =
|
||||
RefreshClusterMaxPriorityRequest.newInstance();
|
||||
RefreshClusterMaxPriorityResponse response =
|
||||
getRouterRMAdminService().refreshClusterMaxPriority(req);
|
||||
return response;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected String[] getGroupsForUser(String user)
|
||||
throws IOException, InterruptedException {
|
||||
return UserGroupInformation.createRemoteUser(user)
|
||||
.doAs(new PrivilegedExceptionAction<String[]>() {
|
||||
@Override
|
||||
public String[] run() throws Exception {
|
||||
String[] response =
|
||||
getRouterRMAdminService().getGroupsForUser(user);
|
||||
return response;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.router.rmadmin;
|
||||
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
|
||||
|
||||
/**
|
||||
* This class mocks the RMAmdinRequestInterceptor.
|
||||
*/
|
||||
public class MockRMAdminRequestInterceptor
|
||||
extends DefaultRMAdminRequestInterceptor {
|
||||
|
||||
public void init(String user) {
|
||||
MockResourceManagerFacade mockRM = new MockResourceManagerFacade(
|
||||
new YarnConfiguration(super.getConf()), 0);
|
||||
super.setRMAdmin(mockRM);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,148 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.router.rmadmin;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
||||
|
||||
/**
|
||||
* Mock intercepter that does not do anything other than forwarding it to the
|
||||
* next intercepter in the chain.
|
||||
*/
|
||||
public class PassThroughRMAdminRequestInterceptor
|
||||
extends AbstractRMAdminRequestInterceptor {
|
||||
|
||||
@Override
|
||||
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
|
||||
throws StandbyException, YarnException, IOException {
|
||||
return getNextInterceptor().refreshQueues(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
|
||||
throws StandbyException, YarnException, IOException {
|
||||
return getNextInterceptor().refreshNodes(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
|
||||
RefreshSuperUserGroupsConfigurationRequest request)
|
||||
throws StandbyException, YarnException, IOException {
|
||||
return getNextInterceptor().refreshSuperUserGroupsConfiguration(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
|
||||
RefreshUserToGroupsMappingsRequest request)
|
||||
throws StandbyException, YarnException, IOException {
|
||||
return getNextInterceptor().refreshUserToGroupsMappings(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshAdminAclsResponse refreshAdminAcls(
|
||||
RefreshAdminAclsRequest request) throws YarnException, IOException {
|
||||
return getNextInterceptor().refreshAdminAcls(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshServiceAclsResponse refreshServiceAcls(
|
||||
RefreshServiceAclsRequest request) throws YarnException, IOException {
|
||||
return getNextInterceptor().refreshServiceAcls(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateNodeResourceResponse updateNodeResource(
|
||||
UpdateNodeResourceRequest request) throws YarnException, IOException {
|
||||
return getNextInterceptor().updateNodeResource(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshNodesResourcesResponse refreshNodesResources(
|
||||
RefreshNodesResourcesRequest request) throws YarnException, IOException {
|
||||
return getNextInterceptor().refreshNodesResources(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AddToClusterNodeLabelsResponse addToClusterNodeLabels(
|
||||
AddToClusterNodeLabelsRequest request) throws YarnException, IOException {
|
||||
return getNextInterceptor().addToClusterNodeLabels(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
|
||||
RemoveFromClusterNodeLabelsRequest request)
|
||||
throws YarnException, IOException {
|
||||
return getNextInterceptor().removeFromClusterNodeLabels(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
|
||||
ReplaceLabelsOnNodeRequest request) throws YarnException, IOException {
|
||||
return getNextInterceptor().replaceLabelsOnNode(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
|
||||
CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)
|
||||
throws YarnException, IOException {
|
||||
return getNextInterceptor()
|
||||
.checkForDecommissioningNodes(checkForDecommissioningNodesRequest);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
|
||||
RefreshClusterMaxPriorityRequest request)
|
||||
throws YarnException, IOException {
|
||||
return getNextInterceptor().refreshClusterMaxPriority(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] getGroupsForUser(String user) throws IOException {
|
||||
return getNextInterceptor().getGroupsForUser(user);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,219 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.router.rmadmin;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
||||
import org.apache.hadoop.yarn.server.router.rmadmin.RouterRMAdminService.RequestInterceptorChainWrapper;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Test class to validate the RMAdmin Service inside the Router.
|
||||
*/
|
||||
public class TestRouterRMAdminService extends BaseRouterRMAdminTest {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestRouterRMAdminService.class);
|
||||
|
||||
/**
|
||||
* Tests if the pipeline is created properly.
|
||||
*/
|
||||
@Test
|
||||
public void testRequestInterceptorChainCreation() throws Exception {
|
||||
RMAdminRequestInterceptor root =
|
||||
super.getRouterRMAdminService().createRequestInterceptorChain();
|
||||
int index = 0;
|
||||
while (root != null) {
|
||||
// The current pipeline is:
|
||||
// PassThroughRMAdminRequestInterceptor - index = 0
|
||||
// PassThroughRMAdminRequestInterceptor - index = 1
|
||||
// PassThroughRMAdminRequestInterceptor - index = 2
|
||||
// MockClientRequestInterceptor - index = 3
|
||||
switch (index) {
|
||||
case 0: // Fall to the next case
|
||||
case 1: // Fall to the next case
|
||||
case 2:
|
||||
// If index is equal to 0,1 or 2 we fall in this check
|
||||
Assert.assertEquals(
|
||||
PassThroughRMAdminRequestInterceptor.class.getName(),
|
||||
root.getClass().getName());
|
||||
break;
|
||||
case 3:
|
||||
Assert.assertEquals(MockRMAdminRequestInterceptor.class.getName(),
|
||||
root.getClass().getName());
|
||||
break;
|
||||
default:
|
||||
Assert.fail();
|
||||
}
|
||||
root = root.getNextInterceptor();
|
||||
index++;
|
||||
}
|
||||
Assert.assertEquals("The number of interceptors in chain does not match", 4,
|
||||
index);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test if the RouterRMAdmin forwards all the requests to the MockRM and get
|
||||
* back the responses.
|
||||
*/
|
||||
@Test
|
||||
public void testRouterRMAdminServiceE2E() throws Exception {
|
||||
|
||||
String user = "test1";
|
||||
|
||||
LOG.info("testRouterRMAdminServiceE2E - Refresh Queues");
|
||||
|
||||
RefreshQueuesResponse responseRefreshQueues = refreshQueues(user);
|
||||
Assert.assertNotNull(responseRefreshQueues);
|
||||
|
||||
LOG.info("testRouterRMAdminServiceE2E - Refresh Nodes");
|
||||
|
||||
RefreshNodesResponse responseRefreshNodes = refreshNodes(user);
|
||||
Assert.assertNotNull(responseRefreshNodes);
|
||||
|
||||
LOG.info("testRouterRMAdminServiceE2E - Refresh Super User");
|
||||
|
||||
RefreshSuperUserGroupsConfigurationResponse responseRefreshSuperUser =
|
||||
refreshSuperUserGroupsConfiguration(user);
|
||||
Assert.assertNotNull(responseRefreshSuperUser);
|
||||
|
||||
LOG.info("testRouterRMAdminServiceE2E - Refresh User to Group");
|
||||
|
||||
RefreshUserToGroupsMappingsResponse responseRefreshUserToGroup =
|
||||
refreshUserToGroupsMappings(user);
|
||||
Assert.assertNotNull(responseRefreshUserToGroup);
|
||||
|
||||
LOG.info("testRouterRMAdminServiceE2E - Refresh Admin Acls");
|
||||
|
||||
RefreshAdminAclsResponse responseRefreshAdminAcls = refreshAdminAcls(user);
|
||||
Assert.assertNotNull(responseRefreshAdminAcls);
|
||||
|
||||
LOG.info("testRouterRMAdminServiceE2E - Refresh Service Acls");
|
||||
|
||||
RefreshServiceAclsResponse responseRefreshServiceAcls =
|
||||
refreshServiceAcls(user);
|
||||
Assert.assertNotNull(responseRefreshServiceAcls);
|
||||
|
||||
LOG.info("testRouterRMAdminServiceE2E - Update Node Resource");
|
||||
|
||||
UpdateNodeResourceResponse responseUpdateNodeResource =
|
||||
updateNodeResource(user);
|
||||
Assert.assertNotNull(responseUpdateNodeResource);
|
||||
|
||||
LOG.info("testRouterRMAdminServiceE2E - Refresh Nodes Resource");
|
||||
|
||||
RefreshNodesResourcesResponse responseRefreshNodesResources =
|
||||
refreshNodesResources(user);
|
||||
Assert.assertNotNull(responseRefreshNodesResources);
|
||||
|
||||
LOG.info("testRouterRMAdminServiceE2E - Add To Cluster NodeLabels");
|
||||
|
||||
AddToClusterNodeLabelsResponse responseAddToClusterNodeLabels =
|
||||
addToClusterNodeLabels(user);
|
||||
Assert.assertNotNull(responseAddToClusterNodeLabels);
|
||||
|
||||
LOG.info("testRouterRMAdminServiceE2E - Remove To Cluster NodeLabels");
|
||||
|
||||
RemoveFromClusterNodeLabelsResponse responseRemoveFromClusterNodeLabels =
|
||||
removeFromClusterNodeLabels(user);
|
||||
Assert.assertNotNull(responseRemoveFromClusterNodeLabels);
|
||||
|
||||
LOG.info("testRouterRMAdminServiceE2E - Replace Labels On Node");
|
||||
|
||||
ReplaceLabelsOnNodeResponse responseReplaceLabelsOnNode =
|
||||
replaceLabelsOnNode(user);
|
||||
Assert.assertNotNull(responseReplaceLabelsOnNode);
|
||||
|
||||
LOG.info("testRouterRMAdminServiceE2E - Check For Decommissioning Nodes");
|
||||
|
||||
CheckForDecommissioningNodesResponse responseCheckForDecom =
|
||||
checkForDecommissioningNodes(user);
|
||||
Assert.assertNotNull(responseCheckForDecom);
|
||||
|
||||
LOG.info("testRouterRMAdminServiceE2E - Refresh Cluster Max Priority");
|
||||
|
||||
RefreshClusterMaxPriorityResponse responseRefreshClusterMaxPriority =
|
||||
refreshClusterMaxPriority(user);
|
||||
Assert.assertNotNull(responseRefreshClusterMaxPriority);
|
||||
|
||||
LOG.info("testRouterRMAdminServiceE2E - Get Groups For User");
|
||||
|
||||
String[] responseGetGroupsForUser = getGroupsForUser(user);
|
||||
Assert.assertNotNull(responseGetGroupsForUser);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Test if the different chains for users are generated, and LRU cache is
|
||||
* working as expected.
|
||||
*/
|
||||
@Test
|
||||
public void testUsersChainMapWithLRUCache()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
|
||||
Map<String, RequestInterceptorChainWrapper> pipelines;
|
||||
RequestInterceptorChainWrapper chain;
|
||||
|
||||
refreshQueues("test1");
|
||||
refreshQueues("test2");
|
||||
refreshQueues("test3");
|
||||
refreshQueues("test4");
|
||||
refreshQueues("test5");
|
||||
refreshQueues("test6");
|
||||
refreshQueues("test7");
|
||||
refreshQueues("test8");
|
||||
|
||||
pipelines = super.getRouterRMAdminService().getPipelines();
|
||||
Assert.assertEquals(8, pipelines.size());
|
||||
|
||||
refreshQueues("test9");
|
||||
refreshQueues("test10");
|
||||
refreshQueues("test1");
|
||||
refreshQueues("test11");
|
||||
|
||||
// The cache max size is defined in
|
||||
// BaseRouterClientRMTest.TEST_MAX_CACHE_SIZE
|
||||
Assert.assertEquals(10, pipelines.size());
|
||||
|
||||
chain = pipelines.get("test1");
|
||||
Assert.assertNotNull("test1 should not be evicted", chain);
|
||||
|
||||
chain = pipelines.get("test2");
|
||||
Assert.assertNull("test2 should have been evicted", chain);
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue