YARN-5413. Create a proxy chain for ResourceManager Admin API in the Router. (Giovanni Matteo Fumarola via Subru).
This commit is contained in:
parent
4846069061
commit
67846a5519
|
@ -2639,6 +2639,8 @@ public class YarnConfiguration extends Configuration {
|
||||||
|
|
||||||
public static final String ROUTER_PREFIX = YARN_PREFIX + "router.";
|
public static final String ROUTER_PREFIX = YARN_PREFIX + "router.";
|
||||||
|
|
||||||
|
public static final String ROUTER_BIND_HOST = ROUTER_PREFIX + "bind-host";
|
||||||
|
|
||||||
public static final String ROUTER_CLIENTRM_PREFIX =
|
public static final String ROUTER_CLIENTRM_PREFIX =
|
||||||
ROUTER_PREFIX + "clientrm.";
|
ROUTER_PREFIX + "clientrm.";
|
||||||
|
|
||||||
|
@ -2654,9 +2656,23 @@ public class YarnConfiguration extends Configuration {
|
||||||
"org.apache.hadoop.yarn.server.router.clientrm."
|
"org.apache.hadoop.yarn.server.router.clientrm."
|
||||||
+ "DefaultClientRequestInterceptor";
|
+ "DefaultClientRequestInterceptor";
|
||||||
|
|
||||||
public static final String ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE =
|
public static final String ROUTER_PIPELINE_CACHE_MAX_SIZE =
|
||||||
ROUTER_CLIENTRM_PREFIX + "cache-max-size";
|
ROUTER_PREFIX + "pipeline.cache-max-size";
|
||||||
public static final int DEFAULT_ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE = 25;
|
public static final int DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE = 25;
|
||||||
|
|
||||||
|
public static final String ROUTER_RMADMIN_PREFIX = ROUTER_PREFIX + "rmadmin.";
|
||||||
|
|
||||||
|
public static final String ROUTER_RMADMIN_ADDRESS =
|
||||||
|
ROUTER_RMADMIN_PREFIX + ".address";
|
||||||
|
public static final int DEFAULT_ROUTER_RMADMIN_PORT = 8052;
|
||||||
|
public static final String DEFAULT_ROUTER_RMADMIN_ADDRESS =
|
||||||
|
"0.0.0.0:" + DEFAULT_ROUTER_RMADMIN_PORT;
|
||||||
|
|
||||||
|
public static final String ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE =
|
||||||
|
ROUTER_RMADMIN_PREFIX + "interceptor-class.pipeline";
|
||||||
|
public static final String DEFAULT_ROUTER_RMADMIN_INTERCEPTOR_CLASS =
|
||||||
|
"org.apache.hadoop.yarn.server.router.rmadmin."
|
||||||
|
+ "DefaultRMAdminRequestInterceptor";
|
||||||
|
|
||||||
////////////////////////////////
|
////////////////////////////////
|
||||||
// Other Configs
|
// Other Configs
|
||||||
|
|
|
@ -3177,12 +3177,33 @@
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>
|
<description>
|
||||||
Size of LRU cache for Router ClientRM Service.
|
Size of LRU cache for Router ClientRM Service and RMAdmin Service.
|
||||||
</description>
|
</description>
|
||||||
<name>yarn.router.clientrm.cache-max-size</name>
|
<name>yarn.router.pipeline.cache-max-size</name>
|
||||||
<value>25</value>
|
<value>25</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>
|
||||||
|
The comma separated list of class names that implement the
|
||||||
|
RequestInterceptor interface. This is used by the RouterRMAdminService
|
||||||
|
to create the request processing pipeline for users.
|
||||||
|
</description>
|
||||||
|
<name>yarn.router.rmadmin.interceptor-class.pipeline</name>
|
||||||
|
<value>org.apache.hadoop.yarn.server.router.rmadmin.DefaultRMAdminRequestInterceptor</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>
|
||||||
|
The actual address the server will bind to. If this optional address is
|
||||||
|
set, the RPC and webapp servers will bind to this address and the port specified in
|
||||||
|
yarn.router.address and yarn.router.webapp.address, respectively. This is
|
||||||
|
most useful for making Router listen to all interfaces by setting to 0.0.0.0.
|
||||||
|
</description>
|
||||||
|
<name>yarn.router.bind-host</name>
|
||||||
|
<value></value>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>
|
<description>
|
||||||
Comma-separated list of PlacementRules to determine how applications
|
Comma-separated list of PlacementRules to determine how applications
|
||||||
|
|
|
@ -24,7 +24,7 @@ import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test class to validate the correctness of the LRUCacheHashMap.
|
* Test class to validate the correctness of the {@code LRUCacheHashMap}.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public class TestLRUCacheHashMap {
|
public class TestLRUCacheHashMap {
|
||||||
|
|
|
@ -26,6 +26,7 @@ import java.util.Set;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.ipc.StandbyException;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||||
|
@ -117,6 +118,33 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -130,8 +158,8 @@ import com.google.common.base.Strings;
|
||||||
* implementation is expected by the Router/AMRMProxy unit test cases. So please
|
* implementation is expected by the Router/AMRMProxy unit test cases. So please
|
||||||
* change the implementation with care.
|
* change the implementation with care.
|
||||||
*/
|
*/
|
||||||
public class MockResourceManagerFacade
|
public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
||||||
implements ApplicationClientProtocol, ApplicationMasterProtocol {
|
ApplicationMasterProtocol, ResourceManagerAdministrationProtocol {
|
||||||
|
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(MockResourceManagerFacade.class);
|
LoggerFactory.getLogger(MockResourceManagerFacade.class);
|
||||||
|
@ -508,4 +536,92 @@ public class MockResourceManagerFacade
|
||||||
throws YarnException, IOException {
|
throws YarnException, IOException {
|
||||||
return UpdateApplicationTimeoutsResponse.newInstance();
|
return UpdateApplicationTimeoutsResponse.newInstance();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
|
||||||
|
throws StandbyException, YarnException, IOException {
|
||||||
|
return RefreshQueuesResponse.newInstance();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
|
||||||
|
throws StandbyException, YarnException, IOException {
|
||||||
|
return RefreshNodesResponse.newInstance();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
|
||||||
|
RefreshSuperUserGroupsConfigurationRequest request)
|
||||||
|
throws StandbyException, YarnException, IOException {
|
||||||
|
return RefreshSuperUserGroupsConfigurationResponse.newInstance();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
|
||||||
|
RefreshUserToGroupsMappingsRequest request)
|
||||||
|
throws StandbyException, YarnException, IOException {
|
||||||
|
return RefreshUserToGroupsMappingsResponse.newInstance();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RefreshAdminAclsResponse refreshAdminAcls(
|
||||||
|
RefreshAdminAclsRequest request) throws YarnException, IOException {
|
||||||
|
return RefreshAdminAclsResponse.newInstance();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RefreshServiceAclsResponse refreshServiceAcls(
|
||||||
|
RefreshServiceAclsRequest request) throws YarnException, IOException {
|
||||||
|
return RefreshServiceAclsResponse.newInstance();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public UpdateNodeResourceResponse updateNodeResource(
|
||||||
|
UpdateNodeResourceRequest request) throws YarnException, IOException {
|
||||||
|
return UpdateNodeResourceResponse.newInstance();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RefreshNodesResourcesResponse refreshNodesResources(
|
||||||
|
RefreshNodesResourcesRequest request) throws YarnException, IOException {
|
||||||
|
return RefreshNodesResourcesResponse.newInstance();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AddToClusterNodeLabelsResponse addToClusterNodeLabels(
|
||||||
|
AddToClusterNodeLabelsRequest request) throws YarnException, IOException {
|
||||||
|
return AddToClusterNodeLabelsResponse.newInstance();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
|
||||||
|
RemoveFromClusterNodeLabelsRequest request)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
return RemoveFromClusterNodeLabelsResponse.newInstance();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
|
||||||
|
ReplaceLabelsOnNodeRequest request) throws YarnException, IOException {
|
||||||
|
return ReplaceLabelsOnNodeResponse.newInstance();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
|
||||||
|
CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
return CheckForDecommissioningNodesResponse.newInstance(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
|
||||||
|
RefreshClusterMaxPriorityRequest request)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
return RefreshClusterMaxPriorityResponse.newInstance();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String[] getGroupsForUser(String user) throws IOException {
|
||||||
|
return new String[0];
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
|
import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
|
||||||
|
import org.apache.hadoop.yarn.server.router.rmadmin.RouterRMAdminService;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -54,6 +55,7 @@ public class Router extends CompositeService {
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private AtomicBoolean isStopping = new AtomicBoolean(false);
|
private AtomicBoolean isStopping = new AtomicBoolean(false);
|
||||||
private RouterClientRMService clientRMProxyService;
|
private RouterClientRMService clientRMProxyService;
|
||||||
|
private RouterRMAdminService rmAdminProxyService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Priority of the Router shutdown hook.
|
* Priority of the Router shutdown hook.
|
||||||
|
@ -71,8 +73,12 @@ public class Router extends CompositeService {
|
||||||
@Override
|
@Override
|
||||||
protected void serviceInit(Configuration config) throws Exception {
|
protected void serviceInit(Configuration config) throws Exception {
|
||||||
this.conf = config;
|
this.conf = config;
|
||||||
|
// ClientRM Proxy
|
||||||
clientRMProxyService = createClientRMProxyService();
|
clientRMProxyService = createClientRMProxyService();
|
||||||
addService(clientRMProxyService);
|
addService(clientRMProxyService);
|
||||||
|
// RMAdmin Proxy
|
||||||
|
rmAdminProxyService = createRMAdminProxyService();
|
||||||
|
addService(rmAdminProxyService);
|
||||||
super.serviceInit(conf);
|
super.serviceInit(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,6 +113,10 @@ public class Router extends CompositeService {
|
||||||
return new RouterClientRMService();
|
return new RouterClientRMService();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected RouterRMAdminService createRMAdminProxyService() {
|
||||||
|
return new RouterRMAdminService();
|
||||||
|
}
|
||||||
|
|
||||||
public static void main(String[] argv) {
|
public static void main(String[] argv) {
|
||||||
Configuration conf = new YarnConfiguration();
|
Configuration conf = new YarnConfiguration();
|
||||||
Thread
|
Thread
|
||||||
|
|
|
@ -21,8 +21,9 @@ package org.apache.hadoop.yarn.server.router.clientrm;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implements the RequestInterceptor interface and provides common functionality
|
* Implements the {@link ClientRequestInterceptor} interface and provides common
|
||||||
* which can can be used and/or extended by other concrete intercepter classes.
|
* functionality which can can be used and/or extended by other concrete
|
||||||
|
* intercepter classes.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public abstract class AbstractClientRequestInterceptor
|
public abstract class AbstractClientRequestInterceptor
|
||||||
|
@ -31,7 +32,7 @@ public abstract class AbstractClientRequestInterceptor
|
||||||
private ClientRequestInterceptor nextInterceptor;
|
private ClientRequestInterceptor nextInterceptor;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets the {@code RequestInterceptor} in the chain.
|
* Sets the {@link ClientRequestInterceptor} in the chain.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void setNextInterceptor(ClientRequestInterceptor nextInterceptor) {
|
public void setNextInterceptor(ClientRequestInterceptor nextInterceptor) {
|
||||||
|
@ -59,7 +60,7 @@ public abstract class AbstractClientRequestInterceptor
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initializes the {@code ClientRequestInterceptor}.
|
* Initializes the {@link ClientRequestInterceptor}.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void init(String user) {
|
public void init(String user) {
|
||||||
|
@ -69,7 +70,7 @@ public abstract class AbstractClientRequestInterceptor
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Disposes the {@code ClientRequestInterceptor}.
|
* Disposes the {@link ClientRequestInterceptor}.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void shutdown() {
|
public void shutdown() {
|
||||||
|
|
|
@ -91,7 +91,7 @@ import org.slf4j.LoggerFactory;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Extends the AbstractRequestInterceptorClient class and provides an
|
* Extends the {@code AbstractRequestInterceptorClient} class and provides an
|
||||||
* implementation that simply forwards the client requests to the cluster
|
* implementation that simply forwards the client requests to the cluster
|
||||||
* resource manager.
|
* resource manager.
|
||||||
*
|
*
|
||||||
|
|
|
@ -104,10 +104,11 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* RouterClientRMService is a service that runs on each router that can be used
|
* RouterClientRMService is a service that runs on each router that can be used
|
||||||
* to intercept and inspect ApplicationClientProtocol messages from client to
|
* to intercept and inspect {@link ApplicationClientProtocol} messages from
|
||||||
* the cluster resource manager. It listens ApplicationClientProtocol messages
|
* client to the cluster resource manager. It listens
|
||||||
* from the client and creates a request intercepting pipeline instance for each
|
* {@link ApplicationClientProtocol} messages from the client and creates a
|
||||||
* client. The pipeline is a chain of intercepter instances that can inspect and
|
* request intercepting pipeline instance for each client. The pipeline is a
|
||||||
|
* chain of {@link ClientRequestInterceptor} instances that can inspect and
|
||||||
* modify the request/response as needed. The main difference with
|
* modify the request/response as needed. The main difference with
|
||||||
* AMRMProxyService is the protocol they implement.
|
* AMRMProxyService is the protocol they implement.
|
||||||
*/
|
*/
|
||||||
|
@ -137,13 +138,14 @@ public class RouterClientRMService extends AbstractService
|
||||||
UserGroupInformation.setConfiguration(conf);
|
UserGroupInformation.setConfiguration(conf);
|
||||||
|
|
||||||
this.listenerEndpoint =
|
this.listenerEndpoint =
|
||||||
conf.getSocketAddr(YarnConfiguration.ROUTER_CLIENTRM_ADDRESS,
|
conf.getSocketAddr(YarnConfiguration.ROUTER_BIND_HOST,
|
||||||
|
YarnConfiguration.ROUTER_CLIENTRM_ADDRESS,
|
||||||
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_ADDRESS,
|
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_ADDRESS,
|
||||||
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PORT);
|
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PORT);
|
||||||
|
|
||||||
int maxCacheSize =
|
int maxCacheSize =
|
||||||
conf.getInt(YarnConfiguration.ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE,
|
conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
|
||||||
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE);
|
YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE);
|
||||||
this.userPipelineMap = Collections.synchronizedMap(
|
this.userPipelineMap = Collections.synchronizedMap(
|
||||||
new LRUCacheHashMap<String, RequestInterceptorChainWrapper>(
|
new LRUCacheHashMap<String, RequestInterceptorChainWrapper>(
|
||||||
maxCacheSize, true));
|
maxCacheSize, true));
|
||||||
|
|
|
@ -0,0 +1,90 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.router.rmadmin;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implements the {@link RMAdminRequestInterceptor} interface and provides
|
||||||
|
* common functionality which can can be used and/or extended by other concrete
|
||||||
|
* intercepter classes.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public abstract class AbstractRMAdminRequestInterceptor
|
||||||
|
implements RMAdminRequestInterceptor {
|
||||||
|
private Configuration conf;
|
||||||
|
private RMAdminRequestInterceptor nextInterceptor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the {@link RMAdminRequestInterceptor} in the chain.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void setNextInterceptor(RMAdminRequestInterceptor nextInterceptor) {
|
||||||
|
this.nextInterceptor = nextInterceptor;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the {@link Configuration}.
|
||||||
|
*/
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setConf(Configuration conf) {
|
||||||
|
this.conf = conf;
|
||||||
|
if (this.nextInterceptor != null) {
|
||||||
|
this.nextInterceptor.setConf(conf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the {@link Configuration}.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public Configuration getConf() {
|
||||||
|
return this.conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initializes the {@link RMAdminRequestInterceptor}.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void init(String user) {
|
||||||
|
if (this.nextInterceptor != null) {
|
||||||
|
this.nextInterceptor.init(user);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Disposes the {@link RMAdminRequestInterceptor}.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void shutdown() {
|
||||||
|
if (this.nextInterceptor != null) {
|
||||||
|
this.nextInterceptor.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the next {@link RMAdminRequestInterceptor} in the chain.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public RMAdminRequestInterceptor getNextInterceptor() {
|
||||||
|
return this.nextInterceptor;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,215 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.router.rmadmin;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.ipc.StandbyException;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extends the {@link AbstractRMAdminRequestInterceptor} class and provides an
|
||||||
|
* implementation that simply forwards the client requests to the cluster
|
||||||
|
* resource manager.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class DefaultRMAdminRequestInterceptor
|
||||||
|
extends AbstractRMAdminRequestInterceptor {
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(DefaultRMAdminRequestInterceptor.class);
|
||||||
|
private ResourceManagerAdministrationProtocol rmAdminProxy;
|
||||||
|
private UserGroupInformation user = null;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(String userName) {
|
||||||
|
super.init(userName);
|
||||||
|
try {
|
||||||
|
// Do not create a proxy user if user name matches the user name on
|
||||||
|
// current UGI
|
||||||
|
if (userName.equalsIgnoreCase(
|
||||||
|
UserGroupInformation.getCurrentUser().getUserName())) {
|
||||||
|
user = UserGroupInformation.getCurrentUser();
|
||||||
|
} else {
|
||||||
|
user = UserGroupInformation.createProxyUser(userName,
|
||||||
|
UserGroupInformation.getCurrentUser());
|
||||||
|
}
|
||||||
|
|
||||||
|
final Configuration conf = this.getConf();
|
||||||
|
|
||||||
|
rmAdminProxy = user.doAs(
|
||||||
|
new PrivilegedExceptionAction<ResourceManagerAdministrationProtocol>() {
|
||||||
|
@Override
|
||||||
|
public ResourceManagerAdministrationProtocol run()
|
||||||
|
throws Exception {
|
||||||
|
return ClientRMProxy.createRMProxy(conf,
|
||||||
|
ResourceManagerAdministrationProtocol.class);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (IOException e) {
|
||||||
|
String message = "Error while creating Router RMAdmin Service for user:";
|
||||||
|
if (user != null) {
|
||||||
|
message += ", user: " + user;
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info(message);
|
||||||
|
throw new YarnRuntimeException(message, e);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new YarnRuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setNextInterceptor(RMAdminRequestInterceptor next) {
|
||||||
|
throw new YarnRuntimeException("setNextInterceptor is being called on "
|
||||||
|
+ "DefaultRMAdminRequestInterceptor, which should be the last one "
|
||||||
|
+ "in the chain. Check if the interceptor pipeline configuration "
|
||||||
|
+ "is correct");
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void setRMAdmin(ResourceManagerAdministrationProtocol rmAdmin) {
|
||||||
|
this.rmAdminProxy = rmAdmin;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
|
||||||
|
throws StandbyException, YarnException, IOException {
|
||||||
|
return rmAdminProxy.refreshQueues(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
|
||||||
|
throws StandbyException, YarnException, IOException {
|
||||||
|
return rmAdminProxy.refreshNodes(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
|
||||||
|
RefreshSuperUserGroupsConfigurationRequest request)
|
||||||
|
throws StandbyException, YarnException, IOException {
|
||||||
|
return rmAdminProxy.refreshSuperUserGroupsConfiguration(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
|
||||||
|
RefreshUserToGroupsMappingsRequest request)
|
||||||
|
throws StandbyException, YarnException, IOException {
|
||||||
|
return rmAdminProxy.refreshUserToGroupsMappings(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RefreshAdminAclsResponse refreshAdminAcls(
|
||||||
|
RefreshAdminAclsRequest request) throws YarnException, IOException {
|
||||||
|
return rmAdminProxy.refreshAdminAcls(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RefreshServiceAclsResponse refreshServiceAcls(
|
||||||
|
RefreshServiceAclsRequest request) throws YarnException, IOException {
|
||||||
|
return rmAdminProxy.refreshServiceAcls(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public UpdateNodeResourceResponse updateNodeResource(
|
||||||
|
UpdateNodeResourceRequest request) throws YarnException, IOException {
|
||||||
|
return rmAdminProxy.updateNodeResource(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RefreshNodesResourcesResponse refreshNodesResources(
|
||||||
|
RefreshNodesResourcesRequest request) throws YarnException, IOException {
|
||||||
|
return rmAdminProxy.refreshNodesResources(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AddToClusterNodeLabelsResponse addToClusterNodeLabels(
|
||||||
|
AddToClusterNodeLabelsRequest request) throws YarnException, IOException {
|
||||||
|
return rmAdminProxy.addToClusterNodeLabels(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
|
||||||
|
RemoveFromClusterNodeLabelsRequest request)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
return rmAdminProxy.removeFromClusterNodeLabels(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
|
||||||
|
ReplaceLabelsOnNodeRequest request) throws YarnException, IOException {
|
||||||
|
return rmAdminProxy.replaceLabelsOnNode(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
|
||||||
|
CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
return rmAdminProxy
|
||||||
|
.checkForDecommissioningNodes(checkForDecommissioningNodesRequest);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
|
||||||
|
RefreshClusterMaxPriorityRequest request)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
return rmAdminProxy.refreshClusterMaxPriority(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String[] getGroupsForUser(String userName) throws IOException {
|
||||||
|
return rmAdminProxy.getGroupsForUser(userName);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,65 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.router.rmadmin;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configurable;
|
||||||
|
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Defines the contract to be implemented by the request intercepter classes,
|
||||||
|
* that can be used to intercept and inspect messages sent from the client to
|
||||||
|
* the resource manager.
|
||||||
|
*/
|
||||||
|
public interface RMAdminRequestInterceptor
|
||||||
|
extends ResourceManagerAdministrationProtocol, Configurable {
|
||||||
|
/**
|
||||||
|
* This method is called for initializing the intercepter. This is guaranteed
|
||||||
|
* to be called only once in the lifetime of this instance.
|
||||||
|
*
|
||||||
|
* @param user the name of the client
|
||||||
|
*/
|
||||||
|
void init(String user);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method is called to release the resources held by the intercepter.
|
||||||
|
* This will be called when the application pipeline is being destroyed. The
|
||||||
|
* concrete implementations should dispose the resources and forward the
|
||||||
|
* request to the next intercepter, if any.
|
||||||
|
*/
|
||||||
|
void shutdown();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the next intercepter in the pipeline. The concrete implementation of
|
||||||
|
* this interface should always pass the request to the nextInterceptor after
|
||||||
|
* inspecting the message. The last intercepter in the chain is responsible to
|
||||||
|
* send the messages to the resource manager service and so the last
|
||||||
|
* intercepter will not receive this method call.
|
||||||
|
*
|
||||||
|
* @param nextInterceptor the RMAdminRequestInterceptor to set in the pipeline
|
||||||
|
*/
|
||||||
|
void setNextInterceptor(RMAdminRequestInterceptor nextInterceptor);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the next intercepter in the chain.
|
||||||
|
*
|
||||||
|
* @return the next intercepter in the chain
|
||||||
|
*/
|
||||||
|
RMAdminRequestInterceptor getNextInterceptor();
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,423 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.router.rmadmin;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.ipc.Server;
|
||||||
|
import org.apache.hadoop.ipc.StandbyException;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.service.AbstractService;
|
||||||
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
|
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
||||||
|
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RouterRMAdminService is a service that runs on each router that can be used
|
||||||
|
* to intercept and inspect {@code ResourceManagerAdministrationProtocol}
|
||||||
|
* messages from client to the cluster resource manager. It listens
|
||||||
|
* {@code ResourceManagerAdministrationProtocol} messages from the client and
|
||||||
|
* creates a request intercepting pipeline instance for each client. The
|
||||||
|
* pipeline is a chain of intercepter instances that can inspect and modify the
|
||||||
|
* request/response as needed. The main difference with AMRMProxyService is the
|
||||||
|
* protocol they implement.
|
||||||
|
*/
|
||||||
|
public class RouterRMAdminService extends AbstractService
|
||||||
|
implements ResourceManagerAdministrationProtocol {
|
||||||
|
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(RouterRMAdminService.class);
|
||||||
|
|
||||||
|
private Server server;
|
||||||
|
private InetSocketAddress listenerEndpoint;
|
||||||
|
|
||||||
|
// For each user we store an interceptors' pipeline.
|
||||||
|
// For performance issue we use LRU cache to keep in memory the newest ones
|
||||||
|
// and remove the oldest used ones.
|
||||||
|
private Map<String, RequestInterceptorChainWrapper> userPipelineMap;
|
||||||
|
|
||||||
|
public RouterRMAdminService() {
|
||||||
|
super(RouterRMAdminService.class.getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serviceStart() throws Exception {
|
||||||
|
LOG.info("Starting Router RMAdmin Service");
|
||||||
|
Configuration conf = getConfig();
|
||||||
|
YarnRPC rpc = YarnRPC.create(conf);
|
||||||
|
UserGroupInformation.setConfiguration(conf);
|
||||||
|
|
||||||
|
this.listenerEndpoint =
|
||||||
|
conf.getSocketAddr(YarnConfiguration.ROUTER_BIND_HOST,
|
||||||
|
YarnConfiguration.ROUTER_RMADMIN_ADDRESS,
|
||||||
|
YarnConfiguration.DEFAULT_ROUTER_RMADMIN_ADDRESS,
|
||||||
|
YarnConfiguration.DEFAULT_ROUTER_RMADMIN_PORT);
|
||||||
|
|
||||||
|
int maxCacheSize =
|
||||||
|
conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
|
||||||
|
YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE);
|
||||||
|
this.userPipelineMap = Collections.synchronizedMap(
|
||||||
|
new LRUCacheHashMap<String, RequestInterceptorChainWrapper>(
|
||||||
|
maxCacheSize, true));
|
||||||
|
|
||||||
|
Configuration serverConf = new Configuration(conf);
|
||||||
|
|
||||||
|
int numWorkerThreads =
|
||||||
|
serverConf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT,
|
||||||
|
YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT);
|
||||||
|
|
||||||
|
this.server = rpc.getServer(ResourceManagerAdministrationProtocol.class,
|
||||||
|
this, listenerEndpoint, serverConf, null, numWorkerThreads);
|
||||||
|
|
||||||
|
this.server.start();
|
||||||
|
LOG.info("Router RMAdminService listening on address: "
|
||||||
|
+ this.server.getListenerAddress());
|
||||||
|
super.serviceStart();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serviceStop() throws Exception {
|
||||||
|
LOG.info("Stopping Router RMAdminService");
|
||||||
|
if (this.server != null) {
|
||||||
|
this.server.stop();
|
||||||
|
}
|
||||||
|
userPipelineMap.clear();
|
||||||
|
super.serviceStop();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the comma separated intercepter class names from the configuration.
|
||||||
|
*
|
||||||
|
* @param conf
|
||||||
|
* @return the intercepter class names as an instance of ArrayList
|
||||||
|
*/
|
||||||
|
private List<String> getInterceptorClassNames(Configuration conf) {
|
||||||
|
String configuredInterceptorClassNames =
|
||||||
|
conf.get(YarnConfiguration.ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE,
|
||||||
|
YarnConfiguration.DEFAULT_ROUTER_RMADMIN_INTERCEPTOR_CLASS);
|
||||||
|
|
||||||
|
List<String> interceptorClassNames = new ArrayList<String>();
|
||||||
|
Collection<String> tempList =
|
||||||
|
StringUtils.getStringCollection(configuredInterceptorClassNames);
|
||||||
|
for (String item : tempList) {
|
||||||
|
interceptorClassNames.add(item.trim());
|
||||||
|
}
|
||||||
|
|
||||||
|
return interceptorClassNames;
|
||||||
|
}
|
||||||
|
|
||||||
|
private RequestInterceptorChainWrapper getInterceptorChain()
|
||||||
|
throws IOException {
|
||||||
|
String user = UserGroupInformation.getCurrentUser().getUserName();
|
||||||
|
if (!userPipelineMap.containsKey(user)) {
|
||||||
|
initializePipeline(user);
|
||||||
|
}
|
||||||
|
return userPipelineMap.get(user);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the Request intercepter chains for all the users.
|
||||||
|
*
|
||||||
|
* @return the request intercepter chains.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
protected Map<String, RequestInterceptorChainWrapper> getPipelines() {
|
||||||
|
return this.userPipelineMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method creates and returns reference of the first intercepter in the
|
||||||
|
* chain of request intercepter instances.
|
||||||
|
*
|
||||||
|
* @return the reference of the first intercepter in the chain
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
protected RMAdminRequestInterceptor createRequestInterceptorChain() {
|
||||||
|
Configuration conf = getConfig();
|
||||||
|
|
||||||
|
List<String> interceptorClassNames = getInterceptorClassNames(conf);
|
||||||
|
|
||||||
|
RMAdminRequestInterceptor pipeline = null;
|
||||||
|
RMAdminRequestInterceptor current = null;
|
||||||
|
for (String interceptorClassName : interceptorClassNames) {
|
||||||
|
try {
|
||||||
|
Class<?> interceptorClass = conf.getClassByName(interceptorClassName);
|
||||||
|
if (RMAdminRequestInterceptor.class
|
||||||
|
.isAssignableFrom(interceptorClass)) {
|
||||||
|
RMAdminRequestInterceptor interceptorInstance =
|
||||||
|
(RMAdminRequestInterceptor) ReflectionUtils
|
||||||
|
.newInstance(interceptorClass, conf);
|
||||||
|
if (pipeline == null) {
|
||||||
|
pipeline = interceptorInstance;
|
||||||
|
current = interceptorInstance;
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
current.setNextInterceptor(interceptorInstance);
|
||||||
|
current = interceptorInstance;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw new YarnRuntimeException(
|
||||||
|
"Class: " + interceptorClassName + " not instance of "
|
||||||
|
+ RMAdminRequestInterceptor.class.getCanonicalName());
|
||||||
|
}
|
||||||
|
} catch (ClassNotFoundException e) {
|
||||||
|
throw new YarnRuntimeException(
|
||||||
|
"Could not instantiate RMAdminRequestInterceptor: "
|
||||||
|
+ interceptorClassName,
|
||||||
|
e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pipeline == null) {
|
||||||
|
throw new YarnRuntimeException(
|
||||||
|
"RequestInterceptor pipeline is not configured in the system");
|
||||||
|
}
|
||||||
|
return pipeline;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initializes the request intercepter pipeline for the specified user.
|
||||||
|
*
|
||||||
|
* @param user
|
||||||
|
*/
|
||||||
|
private void initializePipeline(String user) {
|
||||||
|
RequestInterceptorChainWrapper chainWrapper = null;
|
||||||
|
synchronized (this.userPipelineMap) {
|
||||||
|
if (this.userPipelineMap.containsKey(user)) {
|
||||||
|
LOG.info("Request to start an already existing user: {}"
|
||||||
|
+ " was received, so ignoring.", user);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
chainWrapper = new RequestInterceptorChainWrapper();
|
||||||
|
this.userPipelineMap.put(user, chainWrapper);
|
||||||
|
}
|
||||||
|
|
||||||
|
// We register the pipeline instance in the map first and then initialize it
|
||||||
|
// later because chain initialization can be expensive and we would like to
|
||||||
|
// release the lock as soon as possible to prevent other applications from
|
||||||
|
// blocking when one application's chain is initializing
|
||||||
|
LOG.info("Initializing request processing pipeline for the user: {}", user);
|
||||||
|
|
||||||
|
try {
|
||||||
|
RMAdminRequestInterceptor interceptorChain =
|
||||||
|
this.createRequestInterceptorChain();
|
||||||
|
interceptorChain.init(user);
|
||||||
|
chainWrapper.init(interceptorChain);
|
||||||
|
} catch (Exception e) {
|
||||||
|
synchronized (this.userPipelineMap) {
|
||||||
|
this.userPipelineMap.remove(user);
|
||||||
|
}
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Private structure for encapsulating RequestInterceptor and user instances.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
public static class RequestInterceptorChainWrapper {
|
||||||
|
private RMAdminRequestInterceptor rootInterceptor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initializes the wrapper with the specified parameters.
|
||||||
|
*
|
||||||
|
* @param interceptor the first interceptor in the pipeline
|
||||||
|
*/
|
||||||
|
public synchronized void init(RMAdminRequestInterceptor interceptor) {
|
||||||
|
this.rootInterceptor = interceptor;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the root request intercepter.
|
||||||
|
*
|
||||||
|
* @return the root request intercepter
|
||||||
|
*/
|
||||||
|
public synchronized RMAdminRequestInterceptor getRootInterceptor() {
|
||||||
|
return rootInterceptor;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Shutdown the chain of interceptors when the object is destroyed.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected void finalize() {
|
||||||
|
rootInterceptor.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String[] getGroupsForUser(String user) throws IOException {
|
||||||
|
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
|
||||||
|
return pipeline.getRootInterceptor().getGroupsForUser(user);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
|
||||||
|
throws StandbyException, YarnException, IOException {
|
||||||
|
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
|
||||||
|
return pipeline.getRootInterceptor().refreshQueues(request);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
|
||||||
|
throws StandbyException, YarnException, IOException {
|
||||||
|
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
|
||||||
|
return pipeline.getRootInterceptor().refreshNodes(request);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
|
||||||
|
RefreshSuperUserGroupsConfigurationRequest request)
|
||||||
|
throws StandbyException, YarnException, IOException {
|
||||||
|
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
|
||||||
|
return pipeline.getRootInterceptor()
|
||||||
|
.refreshSuperUserGroupsConfiguration(request);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
|
||||||
|
RefreshUserToGroupsMappingsRequest request)
|
||||||
|
throws StandbyException, YarnException, IOException {
|
||||||
|
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
|
||||||
|
return pipeline.getRootInterceptor().refreshUserToGroupsMappings(request);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RefreshAdminAclsResponse refreshAdminAcls(
|
||||||
|
RefreshAdminAclsRequest request) throws YarnException, IOException {
|
||||||
|
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
|
||||||
|
return pipeline.getRootInterceptor().refreshAdminAcls(request);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RefreshServiceAclsResponse refreshServiceAcls(
|
||||||
|
RefreshServiceAclsRequest request) throws YarnException, IOException {
|
||||||
|
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
|
||||||
|
return pipeline.getRootInterceptor().refreshServiceAcls(request);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public UpdateNodeResourceResponse updateNodeResource(
|
||||||
|
UpdateNodeResourceRequest request) throws YarnException, IOException {
|
||||||
|
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
|
||||||
|
return pipeline.getRootInterceptor().updateNodeResource(request);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RefreshNodesResourcesResponse refreshNodesResources(
|
||||||
|
RefreshNodesResourcesRequest request) throws YarnException, IOException {
|
||||||
|
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
|
||||||
|
return pipeline.getRootInterceptor().refreshNodesResources(request);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AddToClusterNodeLabelsResponse addToClusterNodeLabels(
|
||||||
|
AddToClusterNodeLabelsRequest request) throws YarnException, IOException {
|
||||||
|
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
|
||||||
|
return pipeline.getRootInterceptor().addToClusterNodeLabels(request);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
|
||||||
|
RemoveFromClusterNodeLabelsRequest request)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
|
||||||
|
return pipeline.getRootInterceptor().removeFromClusterNodeLabels(request);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
|
||||||
|
ReplaceLabelsOnNodeRequest request) throws YarnException, IOException {
|
||||||
|
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
|
||||||
|
return pipeline.getRootInterceptor().replaceLabelsOnNode(request);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
|
||||||
|
CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
|
||||||
|
return pipeline.getRootInterceptor()
|
||||||
|
.checkForDecommissioningNodes(checkForDecommissioningNodesRequest);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
|
||||||
|
RefreshClusterMaxPriorityRequest request)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
|
||||||
|
return pipeline.getRootInterceptor().refreshClusterMaxPriority(request);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,20 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/** Router RM Admin Proxy Service package. **/
|
||||||
|
package org.apache.hadoop.yarn.server.router.rmadmin;
|
|
@ -133,7 +133,7 @@ public abstract class BaseRouterClientRMTest {
|
||||||
+ "," + mockPassThroughInterceptorClass + ","
|
+ "," + mockPassThroughInterceptorClass + ","
|
||||||
+ MockClientRequestInterceptor.class.getName());
|
+ MockClientRequestInterceptor.class.getName());
|
||||||
|
|
||||||
this.conf.setInt(YarnConfiguration.ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE,
|
this.conf.setInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
|
||||||
TEST_MAX_CACHE_SIZE);
|
TEST_MAX_CACHE_SIZE);
|
||||||
|
|
||||||
this.dispatcher = new AsyncDispatcher();
|
this.dispatcher = new AsyncDispatcher();
|
||||||
|
|
|
@ -0,0 +1,346 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.router.rmadmin;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Base class for all the RouterRMAdminService test cases. It provides utility
|
||||||
|
* methods that can be used by the concrete test case classes.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public abstract class BaseRouterRMAdminTest {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The RouterRMAdminService instance that will be used by all the test cases.
|
||||||
|
*/
|
||||||
|
private MockRouterRMAdminService rmAdminService;
|
||||||
|
/**
|
||||||
|
* Thread pool used for asynchronous operations.
|
||||||
|
*/
|
||||||
|
private static ExecutorService threadpool = Executors.newCachedThreadPool();
|
||||||
|
private Configuration conf;
|
||||||
|
private AsyncDispatcher dispatcher;
|
||||||
|
|
||||||
|
public final static int TEST_MAX_CACHE_SIZE = 10;
|
||||||
|
|
||||||
|
protected MockRouterRMAdminService getRouterRMAdminService() {
|
||||||
|
Assert.assertNotNull(this.rmAdminService);
|
||||||
|
return this.rmAdminService;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() {
|
||||||
|
this.conf = new YarnConfiguration();
|
||||||
|
String mockPassThroughInterceptorClass =
|
||||||
|
PassThroughRMAdminRequestInterceptor.class.getName();
|
||||||
|
|
||||||
|
// Create a request intercepter pipeline for testing. The last one in the
|
||||||
|
// chain will call the mock resource manager. The others in the chain will
|
||||||
|
// simply forward it to the next one in the chain
|
||||||
|
this.conf.set(YarnConfiguration.ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE,
|
||||||
|
mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
|
||||||
|
+ "," + mockPassThroughInterceptorClass + ","
|
||||||
|
+ MockRMAdminRequestInterceptor.class.getName());
|
||||||
|
|
||||||
|
this.conf.setInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
|
||||||
|
TEST_MAX_CACHE_SIZE);
|
||||||
|
|
||||||
|
this.dispatcher = new AsyncDispatcher();
|
||||||
|
this.dispatcher.init(conf);
|
||||||
|
this.dispatcher.start();
|
||||||
|
this.rmAdminService = createAndStartRouterRMAdminService();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() {
|
||||||
|
if (rmAdminService != null) {
|
||||||
|
rmAdminService.stop();
|
||||||
|
rmAdminService = null;
|
||||||
|
}
|
||||||
|
if (this.dispatcher != null) {
|
||||||
|
this.dispatcher.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ExecutorService getThreadPool() {
|
||||||
|
return threadpool;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected MockRouterRMAdminService createAndStartRouterRMAdminService() {
|
||||||
|
MockRouterRMAdminService svc = new MockRouterRMAdminService();
|
||||||
|
svc.init(conf);
|
||||||
|
svc.start();
|
||||||
|
return svc;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static class MockRouterRMAdminService extends RouterRMAdminService {
|
||||||
|
public MockRouterRMAdminService() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected RefreshQueuesResponse refreshQueues(String user)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
return UserGroupInformation.createRemoteUser(user)
|
||||||
|
.doAs(new PrivilegedExceptionAction<RefreshQueuesResponse>() {
|
||||||
|
@Override
|
||||||
|
public RefreshQueuesResponse run() throws Exception {
|
||||||
|
RefreshQueuesRequest req = RefreshQueuesRequest.newInstance();
|
||||||
|
RefreshQueuesResponse response =
|
||||||
|
getRouterRMAdminService().refreshQueues(req);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
protected RefreshNodesResponse refreshNodes(String user)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
return UserGroupInformation.createRemoteUser(user)
|
||||||
|
.doAs(new PrivilegedExceptionAction<RefreshNodesResponse>() {
|
||||||
|
@Override
|
||||||
|
public RefreshNodesResponse run() throws Exception {
|
||||||
|
RefreshNodesRequest req = RefreshNodesRequest.newInstance();
|
||||||
|
RefreshNodesResponse response =
|
||||||
|
getRouterRMAdminService().refreshNodes(req);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
protected RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
|
||||||
|
String user) throws IOException, InterruptedException {
|
||||||
|
return UserGroupInformation.createRemoteUser(user).doAs(
|
||||||
|
new PrivilegedExceptionAction<RefreshSuperUserGroupsConfigurationResponse>() {
|
||||||
|
@Override
|
||||||
|
public RefreshSuperUserGroupsConfigurationResponse run()
|
||||||
|
throws Exception {
|
||||||
|
RefreshSuperUserGroupsConfigurationRequest req =
|
||||||
|
RefreshSuperUserGroupsConfigurationRequest.newInstance();
|
||||||
|
RefreshSuperUserGroupsConfigurationResponse response =
|
||||||
|
getRouterRMAdminService()
|
||||||
|
.refreshSuperUserGroupsConfiguration(req);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
protected RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
|
||||||
|
String user) throws IOException, InterruptedException {
|
||||||
|
return UserGroupInformation.createRemoteUser(user).doAs(
|
||||||
|
new PrivilegedExceptionAction<RefreshUserToGroupsMappingsResponse>() {
|
||||||
|
@Override
|
||||||
|
public RefreshUserToGroupsMappingsResponse run() throws Exception {
|
||||||
|
RefreshUserToGroupsMappingsRequest req =
|
||||||
|
RefreshUserToGroupsMappingsRequest.newInstance();
|
||||||
|
RefreshUserToGroupsMappingsResponse response =
|
||||||
|
getRouterRMAdminService().refreshUserToGroupsMappings(req);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
protected RefreshAdminAclsResponse refreshAdminAcls(String user)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
return UserGroupInformation.createRemoteUser(user)
|
||||||
|
.doAs(new PrivilegedExceptionAction<RefreshAdminAclsResponse>() {
|
||||||
|
@Override
|
||||||
|
public RefreshAdminAclsResponse run() throws Exception {
|
||||||
|
RefreshAdminAclsRequest req = RefreshAdminAclsRequest.newInstance();
|
||||||
|
RefreshAdminAclsResponse response =
|
||||||
|
getRouterRMAdminService().refreshAdminAcls(req);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
protected RefreshServiceAclsResponse refreshServiceAcls(String user)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
return UserGroupInformation.createRemoteUser(user)
|
||||||
|
.doAs(new PrivilegedExceptionAction<RefreshServiceAclsResponse>() {
|
||||||
|
@Override
|
||||||
|
public RefreshServiceAclsResponse run() throws Exception {
|
||||||
|
RefreshServiceAclsRequest req =
|
||||||
|
RefreshServiceAclsRequest.newInstance();
|
||||||
|
RefreshServiceAclsResponse response =
|
||||||
|
getRouterRMAdminService().refreshServiceAcls(req);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
protected UpdateNodeResourceResponse updateNodeResource(String user)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
return UserGroupInformation.createRemoteUser(user)
|
||||||
|
.doAs(new PrivilegedExceptionAction<UpdateNodeResourceResponse>() {
|
||||||
|
@Override
|
||||||
|
public UpdateNodeResourceResponse run() throws Exception {
|
||||||
|
UpdateNodeResourceRequest req =
|
||||||
|
UpdateNodeResourceRequest.newInstance(null);
|
||||||
|
UpdateNodeResourceResponse response =
|
||||||
|
getRouterRMAdminService().updateNodeResource(req);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
protected RefreshNodesResourcesResponse refreshNodesResources(String user)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
return UserGroupInformation.createRemoteUser(user)
|
||||||
|
.doAs(new PrivilegedExceptionAction<RefreshNodesResourcesResponse>() {
|
||||||
|
@Override
|
||||||
|
public RefreshNodesResourcesResponse run() throws Exception {
|
||||||
|
RefreshNodesResourcesRequest req =
|
||||||
|
RefreshNodesResourcesRequest.newInstance();
|
||||||
|
RefreshNodesResourcesResponse response =
|
||||||
|
getRouterRMAdminService().refreshNodesResources(req);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
protected AddToClusterNodeLabelsResponse addToClusterNodeLabels(String user)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
return UserGroupInformation.createRemoteUser(user)
|
||||||
|
.doAs(new PrivilegedExceptionAction<AddToClusterNodeLabelsResponse>() {
|
||||||
|
@Override
|
||||||
|
public AddToClusterNodeLabelsResponse run() throws Exception {
|
||||||
|
AddToClusterNodeLabelsRequest req =
|
||||||
|
AddToClusterNodeLabelsRequest.newInstance(null);
|
||||||
|
AddToClusterNodeLabelsResponse response =
|
||||||
|
getRouterRMAdminService().addToClusterNodeLabels(req);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
protected RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
|
||||||
|
String user) throws IOException, InterruptedException {
|
||||||
|
return UserGroupInformation.createRemoteUser(user).doAs(
|
||||||
|
new PrivilegedExceptionAction<RemoveFromClusterNodeLabelsResponse>() {
|
||||||
|
@Override
|
||||||
|
public RemoveFromClusterNodeLabelsResponse run() throws Exception {
|
||||||
|
RemoveFromClusterNodeLabelsRequest req =
|
||||||
|
RemoveFromClusterNodeLabelsRequest.newInstance(null);
|
||||||
|
RemoveFromClusterNodeLabelsResponse response =
|
||||||
|
getRouterRMAdminService().removeFromClusterNodeLabels(req);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ReplaceLabelsOnNodeResponse replaceLabelsOnNode(String user)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
return UserGroupInformation.createRemoteUser(user)
|
||||||
|
.doAs(new PrivilegedExceptionAction<ReplaceLabelsOnNodeResponse>() {
|
||||||
|
@Override
|
||||||
|
public ReplaceLabelsOnNodeResponse run() throws Exception {
|
||||||
|
ReplaceLabelsOnNodeRequest req = ReplaceLabelsOnNodeRequest
|
||||||
|
.newInstance(new HashMap<NodeId, Set<String>>());
|
||||||
|
ReplaceLabelsOnNodeResponse response =
|
||||||
|
getRouterRMAdminService().replaceLabelsOnNode(req);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
protected CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
|
||||||
|
String user) throws IOException, InterruptedException {
|
||||||
|
return UserGroupInformation.createRemoteUser(user).doAs(
|
||||||
|
new PrivilegedExceptionAction<CheckForDecommissioningNodesResponse>() {
|
||||||
|
@Override
|
||||||
|
public CheckForDecommissioningNodesResponse run() throws Exception {
|
||||||
|
CheckForDecommissioningNodesRequest req =
|
||||||
|
CheckForDecommissioningNodesRequest.newInstance();
|
||||||
|
CheckForDecommissioningNodesResponse response =
|
||||||
|
getRouterRMAdminService().checkForDecommissioningNodes(req);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
protected RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
|
||||||
|
String user) throws IOException, InterruptedException {
|
||||||
|
return UserGroupInformation.createRemoteUser(user).doAs(
|
||||||
|
new PrivilegedExceptionAction<RefreshClusterMaxPriorityResponse>() {
|
||||||
|
@Override
|
||||||
|
public RefreshClusterMaxPriorityResponse run() throws Exception {
|
||||||
|
RefreshClusterMaxPriorityRequest req =
|
||||||
|
RefreshClusterMaxPriorityRequest.newInstance();
|
||||||
|
RefreshClusterMaxPriorityResponse response =
|
||||||
|
getRouterRMAdminService().refreshClusterMaxPriority(req);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String[] getGroupsForUser(String user)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
return UserGroupInformation.createRemoteUser(user)
|
||||||
|
.doAs(new PrivilegedExceptionAction<String[]>() {
|
||||||
|
@Override
|
||||||
|
public String[] run() throws Exception {
|
||||||
|
String[] response =
|
||||||
|
getRouterRMAdminService().getGroupsForUser(user);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,36 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.router.rmadmin;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class mocks the RMAmdinRequestInterceptor.
|
||||||
|
*/
|
||||||
|
public class MockRMAdminRequestInterceptor
|
||||||
|
extends DefaultRMAdminRequestInterceptor {
|
||||||
|
|
||||||
|
public void init(String user) {
|
||||||
|
MockResourceManagerFacade mockRM = new MockResourceManagerFacade(
|
||||||
|
new YarnConfiguration(super.getConf()), 0);
|
||||||
|
super.setRMAdmin(mockRM);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,148 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.router.rmadmin;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.ipc.StandbyException;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Mock intercepter that does not do anything other than forwarding it to the
|
||||||
|
* next intercepter in the chain.
|
||||||
|
*/
|
||||||
|
public class PassThroughRMAdminRequestInterceptor
|
||||||
|
extends AbstractRMAdminRequestInterceptor {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
|
||||||
|
throws StandbyException, YarnException, IOException {
|
||||||
|
return getNextInterceptor().refreshQueues(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
|
||||||
|
throws StandbyException, YarnException, IOException {
|
||||||
|
return getNextInterceptor().refreshNodes(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
|
||||||
|
RefreshSuperUserGroupsConfigurationRequest request)
|
||||||
|
throws StandbyException, YarnException, IOException {
|
||||||
|
return getNextInterceptor().refreshSuperUserGroupsConfiguration(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
|
||||||
|
RefreshUserToGroupsMappingsRequest request)
|
||||||
|
throws StandbyException, YarnException, IOException {
|
||||||
|
return getNextInterceptor().refreshUserToGroupsMappings(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RefreshAdminAclsResponse refreshAdminAcls(
|
||||||
|
RefreshAdminAclsRequest request) throws YarnException, IOException {
|
||||||
|
return getNextInterceptor().refreshAdminAcls(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RefreshServiceAclsResponse refreshServiceAcls(
|
||||||
|
RefreshServiceAclsRequest request) throws YarnException, IOException {
|
||||||
|
return getNextInterceptor().refreshServiceAcls(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public UpdateNodeResourceResponse updateNodeResource(
|
||||||
|
UpdateNodeResourceRequest request) throws YarnException, IOException {
|
||||||
|
return getNextInterceptor().updateNodeResource(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RefreshNodesResourcesResponse refreshNodesResources(
|
||||||
|
RefreshNodesResourcesRequest request) throws YarnException, IOException {
|
||||||
|
return getNextInterceptor().refreshNodesResources(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AddToClusterNodeLabelsResponse addToClusterNodeLabels(
|
||||||
|
AddToClusterNodeLabelsRequest request) throws YarnException, IOException {
|
||||||
|
return getNextInterceptor().addToClusterNodeLabels(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
|
||||||
|
RemoveFromClusterNodeLabelsRequest request)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
return getNextInterceptor().removeFromClusterNodeLabels(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
|
||||||
|
ReplaceLabelsOnNodeRequest request) throws YarnException, IOException {
|
||||||
|
return getNextInterceptor().replaceLabelsOnNode(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
|
||||||
|
CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
return getNextInterceptor()
|
||||||
|
.checkForDecommissioningNodes(checkForDecommissioningNodesRequest);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
|
||||||
|
RefreshClusterMaxPriorityRequest request)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
return getNextInterceptor().refreshClusterMaxPriority(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String[] getGroupsForUser(String user) throws IOException {
|
||||||
|
return getNextInterceptor().getGroupsForUser(user);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,219 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.router.rmadmin;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.router.rmadmin.RouterRMAdminService.RequestInterceptorChainWrapper;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test class to validate the RMAdmin Service inside the Router.
|
||||||
|
*/
|
||||||
|
public class TestRouterRMAdminService extends BaseRouterRMAdminTest {
|
||||||
|
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(TestRouterRMAdminService.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests if the pipeline is created properly.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testRequestInterceptorChainCreation() throws Exception {
|
||||||
|
RMAdminRequestInterceptor root =
|
||||||
|
super.getRouterRMAdminService().createRequestInterceptorChain();
|
||||||
|
int index = 0;
|
||||||
|
while (root != null) {
|
||||||
|
// The current pipeline is:
|
||||||
|
// PassThroughRMAdminRequestInterceptor - index = 0
|
||||||
|
// PassThroughRMAdminRequestInterceptor - index = 1
|
||||||
|
// PassThroughRMAdminRequestInterceptor - index = 2
|
||||||
|
// MockClientRequestInterceptor - index = 3
|
||||||
|
switch (index) {
|
||||||
|
case 0: // Fall to the next case
|
||||||
|
case 1: // Fall to the next case
|
||||||
|
case 2:
|
||||||
|
// If index is equal to 0,1 or 2 we fall in this check
|
||||||
|
Assert.assertEquals(
|
||||||
|
PassThroughRMAdminRequestInterceptor.class.getName(),
|
||||||
|
root.getClass().getName());
|
||||||
|
break;
|
||||||
|
case 3:
|
||||||
|
Assert.assertEquals(MockRMAdminRequestInterceptor.class.getName(),
|
||||||
|
root.getClass().getName());
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
Assert.fail();
|
||||||
|
}
|
||||||
|
root = root.getNextInterceptor();
|
||||||
|
index++;
|
||||||
|
}
|
||||||
|
Assert.assertEquals("The number of interceptors in chain does not match", 4,
|
||||||
|
index);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test if the RouterRMAdmin forwards all the requests to the MockRM and get
|
||||||
|
* back the responses.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testRouterRMAdminServiceE2E() throws Exception {
|
||||||
|
|
||||||
|
String user = "test1";
|
||||||
|
|
||||||
|
LOG.info("testRouterRMAdminServiceE2E - Refresh Queues");
|
||||||
|
|
||||||
|
RefreshQueuesResponse responseRefreshQueues = refreshQueues(user);
|
||||||
|
Assert.assertNotNull(responseRefreshQueues);
|
||||||
|
|
||||||
|
LOG.info("testRouterRMAdminServiceE2E - Refresh Nodes");
|
||||||
|
|
||||||
|
RefreshNodesResponse responseRefreshNodes = refreshNodes(user);
|
||||||
|
Assert.assertNotNull(responseRefreshNodes);
|
||||||
|
|
||||||
|
LOG.info("testRouterRMAdminServiceE2E - Refresh Super User");
|
||||||
|
|
||||||
|
RefreshSuperUserGroupsConfigurationResponse responseRefreshSuperUser =
|
||||||
|
refreshSuperUserGroupsConfiguration(user);
|
||||||
|
Assert.assertNotNull(responseRefreshSuperUser);
|
||||||
|
|
||||||
|
LOG.info("testRouterRMAdminServiceE2E - Refresh User to Group");
|
||||||
|
|
||||||
|
RefreshUserToGroupsMappingsResponse responseRefreshUserToGroup =
|
||||||
|
refreshUserToGroupsMappings(user);
|
||||||
|
Assert.assertNotNull(responseRefreshUserToGroup);
|
||||||
|
|
||||||
|
LOG.info("testRouterRMAdminServiceE2E - Refresh Admin Acls");
|
||||||
|
|
||||||
|
RefreshAdminAclsResponse responseRefreshAdminAcls = refreshAdminAcls(user);
|
||||||
|
Assert.assertNotNull(responseRefreshAdminAcls);
|
||||||
|
|
||||||
|
LOG.info("testRouterRMAdminServiceE2E - Refresh Service Acls");
|
||||||
|
|
||||||
|
RefreshServiceAclsResponse responseRefreshServiceAcls =
|
||||||
|
refreshServiceAcls(user);
|
||||||
|
Assert.assertNotNull(responseRefreshServiceAcls);
|
||||||
|
|
||||||
|
LOG.info("testRouterRMAdminServiceE2E - Update Node Resource");
|
||||||
|
|
||||||
|
UpdateNodeResourceResponse responseUpdateNodeResource =
|
||||||
|
updateNodeResource(user);
|
||||||
|
Assert.assertNotNull(responseUpdateNodeResource);
|
||||||
|
|
||||||
|
LOG.info("testRouterRMAdminServiceE2E - Refresh Nodes Resource");
|
||||||
|
|
||||||
|
RefreshNodesResourcesResponse responseRefreshNodesResources =
|
||||||
|
refreshNodesResources(user);
|
||||||
|
Assert.assertNotNull(responseRefreshNodesResources);
|
||||||
|
|
||||||
|
LOG.info("testRouterRMAdminServiceE2E - Add To Cluster NodeLabels");
|
||||||
|
|
||||||
|
AddToClusterNodeLabelsResponse responseAddToClusterNodeLabels =
|
||||||
|
addToClusterNodeLabels(user);
|
||||||
|
Assert.assertNotNull(responseAddToClusterNodeLabels);
|
||||||
|
|
||||||
|
LOG.info("testRouterRMAdminServiceE2E - Remove To Cluster NodeLabels");
|
||||||
|
|
||||||
|
RemoveFromClusterNodeLabelsResponse responseRemoveFromClusterNodeLabels =
|
||||||
|
removeFromClusterNodeLabels(user);
|
||||||
|
Assert.assertNotNull(responseRemoveFromClusterNodeLabels);
|
||||||
|
|
||||||
|
LOG.info("testRouterRMAdminServiceE2E - Replace Labels On Node");
|
||||||
|
|
||||||
|
ReplaceLabelsOnNodeResponse responseReplaceLabelsOnNode =
|
||||||
|
replaceLabelsOnNode(user);
|
||||||
|
Assert.assertNotNull(responseReplaceLabelsOnNode);
|
||||||
|
|
||||||
|
LOG.info("testRouterRMAdminServiceE2E - Check For Decommissioning Nodes");
|
||||||
|
|
||||||
|
CheckForDecommissioningNodesResponse responseCheckForDecom =
|
||||||
|
checkForDecommissioningNodes(user);
|
||||||
|
Assert.assertNotNull(responseCheckForDecom);
|
||||||
|
|
||||||
|
LOG.info("testRouterRMAdminServiceE2E - Refresh Cluster Max Priority");
|
||||||
|
|
||||||
|
RefreshClusterMaxPriorityResponse responseRefreshClusterMaxPriority =
|
||||||
|
refreshClusterMaxPriority(user);
|
||||||
|
Assert.assertNotNull(responseRefreshClusterMaxPriority);
|
||||||
|
|
||||||
|
LOG.info("testRouterRMAdminServiceE2E - Get Groups For User");
|
||||||
|
|
||||||
|
String[] responseGetGroupsForUser = getGroupsForUser(user);
|
||||||
|
Assert.assertNotNull(responseGetGroupsForUser);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test if the different chains for users are generated, and LRU cache is
|
||||||
|
* working as expected.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testUsersChainMapWithLRUCache()
|
||||||
|
throws YarnException, IOException, InterruptedException {
|
||||||
|
|
||||||
|
Map<String, RequestInterceptorChainWrapper> pipelines;
|
||||||
|
RequestInterceptorChainWrapper chain;
|
||||||
|
|
||||||
|
refreshQueues("test1");
|
||||||
|
refreshQueues("test2");
|
||||||
|
refreshQueues("test3");
|
||||||
|
refreshQueues("test4");
|
||||||
|
refreshQueues("test5");
|
||||||
|
refreshQueues("test6");
|
||||||
|
refreshQueues("test7");
|
||||||
|
refreshQueues("test8");
|
||||||
|
|
||||||
|
pipelines = super.getRouterRMAdminService().getPipelines();
|
||||||
|
Assert.assertEquals(8, pipelines.size());
|
||||||
|
|
||||||
|
refreshQueues("test9");
|
||||||
|
refreshQueues("test10");
|
||||||
|
refreshQueues("test1");
|
||||||
|
refreshQueues("test11");
|
||||||
|
|
||||||
|
// The cache max size is defined in
|
||||||
|
// BaseRouterClientRMTest.TEST_MAX_CACHE_SIZE
|
||||||
|
Assert.assertEquals(10, pipelines.size());
|
||||||
|
|
||||||
|
chain = pipelines.get("test1");
|
||||||
|
Assert.assertNotNull("test1 should not be evicted", chain);
|
||||||
|
|
||||||
|
chain = pipelines.get("test2");
|
||||||
|
Assert.assertNull("test2 should have been evicted", chain);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue