YARN-5413. Create a proxy chain for ResourceManager Admin API in the Router. (Giovanni Matteo Fumarola via Subru).

(cherry picked from commit 67846a5519)
(cherry picked from commit 7444406d6d)
This commit is contained in:
Subru Krishnan 2017-05-09 19:19:27 -07:00 committed by Carlo Curino
parent dc0a2e6c59
commit 7f00f938f2
18 changed files with 1635 additions and 20 deletions

View File

@ -2586,6 +2586,8 @@ public class YarnConfiguration extends Configuration {
public static final String ROUTER_PREFIX = YARN_PREFIX + "router."; public static final String ROUTER_PREFIX = YARN_PREFIX + "router.";
public static final String ROUTER_BIND_HOST = ROUTER_PREFIX + "bind-host";
public static final String ROUTER_CLIENTRM_PREFIX = public static final String ROUTER_CLIENTRM_PREFIX =
ROUTER_PREFIX + "clientrm."; ROUTER_PREFIX + "clientrm.";
@ -2601,9 +2603,23 @@ public class YarnConfiguration extends Configuration {
"org.apache.hadoop.yarn.server.router.clientrm." "org.apache.hadoop.yarn.server.router.clientrm."
+ "DefaultClientRequestInterceptor"; + "DefaultClientRequestInterceptor";
public static final String ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE = public static final String ROUTER_PIPELINE_CACHE_MAX_SIZE =
ROUTER_CLIENTRM_PREFIX + "cache-max-size"; ROUTER_PREFIX + "pipeline.cache-max-size";
public static final int DEFAULT_ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE = 25; public static final int DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE = 25;
public static final String ROUTER_RMADMIN_PREFIX = ROUTER_PREFIX + "rmadmin.";
public static final String ROUTER_RMADMIN_ADDRESS =
ROUTER_RMADMIN_PREFIX + ".address";
public static final int DEFAULT_ROUTER_RMADMIN_PORT = 8052;
public static final String DEFAULT_ROUTER_RMADMIN_ADDRESS =
"0.0.0.0:" + DEFAULT_ROUTER_RMADMIN_PORT;
public static final String ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE =
ROUTER_RMADMIN_PREFIX + "interceptor-class.pipeline";
public static final String DEFAULT_ROUTER_RMADMIN_INTERCEPTOR_CLASS =
"org.apache.hadoop.yarn.server.router.rmadmin."
+ "DefaultRMAdminRequestInterceptor";
//////////////////////////////// ////////////////////////////////
// Other Configs // Other Configs

View File

@ -3132,4 +3132,25 @@
<value>25</value> <value>25</value>
</property> </property>
<property>
<description>
The comma separated list of class names that implement the
RequestInterceptor interface. This is used by the RouterRMAdminService
to create the request processing pipeline for users.
</description>
<name>yarn.router.rmadmin.interceptor-class.pipeline</name>
<value>org.apache.hadoop.yarn.server.router.rmadmin.DefaultRMAdminRequestInterceptor</value>
</property>
<property>
<description>
The actual address the server will bind to. If this optional address is
set, the RPC and webapp servers will bind to this address and the port specified in
yarn.router.address and yarn.router.webapp.address, respectively. This is
most useful for making Router listen to all interfaces by setting to 0.0.0.0.
</description>
<name>yarn.router.bind-host</name>
<value></value>
</property>
</configuration> </configuration>

View File

@ -24,7 +24,7 @@ import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
/** /**
* Test class to validate the correctness of the LRUCacheHashMap. * Test class to validate the correctness of the {@code LRUCacheHashMap}.
* *
*/ */
public class TestLRUCacheHashMap { public class TestLRUCacheHashMap {

View File

@ -124,8 +124,11 @@ import org.mortbay.log.Log;
* implementation is expected by the unit test cases. So please change the * implementation is expected by the unit test cases. So please change the
* implementation with care. * implementation with care.
*/ */
public class MockResourceManagerFacade implements public class MockResourceManagerFacade
ApplicationMasterProtocol, ApplicationClientProtocol { implements ApplicationClientProtocol, ApplicationMasterProtocol {
private static final Logger LOG =
LoggerFactory.getLogger(MockResourceManagerFacade.class);
private HashMap<String, List<ContainerId>> applicationContainerIdMap = private HashMap<String, List<ContainerId>> applicationContainerIdMap =
new HashMap<String, List<ContainerId>>(); new HashMap<String, List<ContainerId>>();

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService; import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
import org.apache.hadoop.yarn.server.router.rmadmin.RouterRMAdminService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -54,6 +55,7 @@ public class Router extends CompositeService {
private Configuration conf; private Configuration conf;
private AtomicBoolean isStopping = new AtomicBoolean(false); private AtomicBoolean isStopping = new AtomicBoolean(false);
private RouterClientRMService clientRMProxyService; private RouterClientRMService clientRMProxyService;
private RouterRMAdminService rmAdminProxyService;
/** /**
* Priority of the Router shutdown hook. * Priority of the Router shutdown hook.
@ -71,8 +73,12 @@ public class Router extends CompositeService {
@Override @Override
protected void serviceInit(Configuration config) throws Exception { protected void serviceInit(Configuration config) throws Exception {
this.conf = config; this.conf = config;
// ClientRM Proxy
clientRMProxyService = createClientRMProxyService(); clientRMProxyService = createClientRMProxyService();
addService(clientRMProxyService); addService(clientRMProxyService);
// RMAdmin Proxy
rmAdminProxyService = createRMAdminProxyService();
addService(rmAdminProxyService);
super.serviceInit(conf); super.serviceInit(conf);
} }
@ -107,6 +113,10 @@ public class Router extends CompositeService {
return new RouterClientRMService(); return new RouterClientRMService();
} }
protected RouterRMAdminService createRMAdminProxyService() {
return new RouterRMAdminService();
}
public static void main(String[] argv) { public static void main(String[] argv) {
Configuration conf = new YarnConfiguration(); Configuration conf = new YarnConfiguration();
Thread Thread

View File

@ -21,8 +21,9 @@ package org.apache.hadoop.yarn.server.router.clientrm;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
/** /**
* Implements the RequestInterceptor interface and provides common functionality * Implements the {@link ClientRequestInterceptor} interface and provides common
* which can can be used and/or extended by other concrete intercepter classes. * functionality which can can be used and/or extended by other concrete
* intercepter classes.
* *
*/ */
public abstract class AbstractClientRequestInterceptor public abstract class AbstractClientRequestInterceptor
@ -31,7 +32,7 @@ public abstract class AbstractClientRequestInterceptor
private ClientRequestInterceptor nextInterceptor; private ClientRequestInterceptor nextInterceptor;
/** /**
* Sets the {@code RequestInterceptor} in the chain. * Sets the {@link ClientRequestInterceptor} in the chain.
*/ */
@Override @Override
public void setNextInterceptor(ClientRequestInterceptor nextInterceptor) { public void setNextInterceptor(ClientRequestInterceptor nextInterceptor) {
@ -59,7 +60,7 @@ public abstract class AbstractClientRequestInterceptor
} }
/** /**
* Initializes the {@code ClientRequestInterceptor}. * Initializes the {@link ClientRequestInterceptor}.
*/ */
@Override @Override
public void init(String user) { public void init(String user) {
@ -69,7 +70,7 @@ public abstract class AbstractClientRequestInterceptor
} }
/** /**
* Disposes the {@code ClientRequestInterceptor}. * Disposes the {@link ClientRequestInterceptor}.
*/ */
@Override @Override
public void shutdown() { public void shutdown() {

View File

@ -91,7 +91,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
/** /**
* Extends the AbstractRequestInterceptorClient class and provides an * Extends the {@code AbstractRequestInterceptorClient} class and provides an
* implementation that simply forwards the client requests to the cluster * implementation that simply forwards the client requests to the cluster
* resource manager. * resource manager.
* *

View File

@ -104,10 +104,11 @@ import com.google.common.annotations.VisibleForTesting;
/** /**
* RouterClientRMService is a service that runs on each router that can be used * RouterClientRMService is a service that runs on each router that can be used
* to intercept and inspect ApplicationClientProtocol messages from client to * to intercept and inspect {@link ApplicationClientProtocol} messages from
* the cluster resource manager. It listens ApplicationClientProtocol messages * client to the cluster resource manager. It listens
* from the client and creates a request intercepting pipeline instance for each * {@link ApplicationClientProtocol} messages from the client and creates a
* client. The pipeline is a chain of intercepter instances that can inspect and * request intercepting pipeline instance for each client. The pipeline is a
* chain of {@link ClientRequestInterceptor} instances that can inspect and
* modify the request/response as needed. The main difference with * modify the request/response as needed. The main difference with
* AMRMProxyService is the protocol they implement. * AMRMProxyService is the protocol they implement.
*/ */
@ -137,13 +138,14 @@ public class RouterClientRMService extends AbstractService
UserGroupInformation.setConfiguration(conf); UserGroupInformation.setConfiguration(conf);
this.listenerEndpoint = this.listenerEndpoint =
conf.getSocketAddr(YarnConfiguration.ROUTER_CLIENTRM_ADDRESS, conf.getSocketAddr(YarnConfiguration.ROUTER_BIND_HOST,
YarnConfiguration.ROUTER_CLIENTRM_ADDRESS,
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_ADDRESS, YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_ADDRESS,
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PORT); YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PORT);
int maxCacheSize = int maxCacheSize =
conf.getInt(YarnConfiguration.ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE, conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE); YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE);
this.userPipelineMap = Collections.synchronizedMap( this.userPipelineMap = Collections.synchronizedMap(
new LRUCacheHashMap<String, RequestInterceptorChainWrapper>( new LRUCacheHashMap<String, RequestInterceptorChainWrapper>(
maxCacheSize, true)); maxCacheSize, true));

View File

@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.rmadmin;
import org.apache.hadoop.conf.Configuration;
/**
* Implements the {@link RMAdminRequestInterceptor} interface and provides
* common functionality which can can be used and/or extended by other concrete
* intercepter classes.
*
*/
public abstract class AbstractRMAdminRequestInterceptor
implements RMAdminRequestInterceptor {
private Configuration conf;
private RMAdminRequestInterceptor nextInterceptor;
/**
* Sets the {@link RMAdminRequestInterceptor} in the chain.
*/
@Override
public void setNextInterceptor(RMAdminRequestInterceptor nextInterceptor) {
this.nextInterceptor = nextInterceptor;
}
/**
* Sets the {@link Configuration}.
*/
@Override
public void setConf(Configuration conf) {
this.conf = conf;
if (this.nextInterceptor != null) {
this.nextInterceptor.setConf(conf);
}
}
/**
* Gets the {@link Configuration}.
*/
@Override
public Configuration getConf() {
return this.conf;
}
/**
* Initializes the {@link RMAdminRequestInterceptor}.
*/
@Override
public void init(String user) {
if (this.nextInterceptor != null) {
this.nextInterceptor.init(user);
}
}
/**
* Disposes the {@link RMAdminRequestInterceptor}.
*/
@Override
public void shutdown() {
if (this.nextInterceptor != null) {
this.nextInterceptor.shutdown();
}
}
/**
* Gets the next {@link RMAdminRequestInterceptor} in the chain.
*/
@Override
public RMAdminRequestInterceptor getNextInterceptor() {
return this.nextInterceptor;
}
}

View File

@ -0,0 +1,215 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.rmadmin;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* Extends the {@link AbstractRMAdminRequestInterceptor} class and provides an
* implementation that simply forwards the client requests to the cluster
* resource manager.
*
*/
public class DefaultRMAdminRequestInterceptor
extends AbstractRMAdminRequestInterceptor {
private static final Logger LOG =
LoggerFactory.getLogger(DefaultRMAdminRequestInterceptor.class);
private ResourceManagerAdministrationProtocol rmAdminProxy;
private UserGroupInformation user = null;
@Override
public void init(String userName) {
super.init(userName);
try {
// Do not create a proxy user if user name matches the user name on
// current UGI
if (userName.equalsIgnoreCase(
UserGroupInformation.getCurrentUser().getUserName())) {
user = UserGroupInformation.getCurrentUser();
} else {
user = UserGroupInformation.createProxyUser(userName,
UserGroupInformation.getCurrentUser());
}
final Configuration conf = this.getConf();
rmAdminProxy = user.doAs(
new PrivilegedExceptionAction<ResourceManagerAdministrationProtocol>() {
@Override
public ResourceManagerAdministrationProtocol run()
throws Exception {
return ClientRMProxy.createRMProxy(conf,
ResourceManagerAdministrationProtocol.class);
}
});
} catch (IOException e) {
String message = "Error while creating Router RMAdmin Service for user:";
if (user != null) {
message += ", user: " + user;
}
LOG.info(message);
throw new YarnRuntimeException(message, e);
} catch (Exception e) {
throw new YarnRuntimeException(e);
}
}
@Override
public void setNextInterceptor(RMAdminRequestInterceptor next) {
throw new YarnRuntimeException("setNextInterceptor is being called on "
+ "DefaultRMAdminRequestInterceptor, which should be the last one "
+ "in the chain. Check if the interceptor pipeline configuration "
+ "is correct");
}
@VisibleForTesting
public void setRMAdmin(ResourceManagerAdministrationProtocol rmAdmin) {
this.rmAdminProxy = rmAdmin;
}
@Override
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
throws StandbyException, YarnException, IOException {
return rmAdminProxy.refreshQueues(request);
}
@Override
public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
throws StandbyException, YarnException, IOException {
return rmAdminProxy.refreshNodes(request);
}
@Override
public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
RefreshSuperUserGroupsConfigurationRequest request)
throws StandbyException, YarnException, IOException {
return rmAdminProxy.refreshSuperUserGroupsConfiguration(request);
}
@Override
public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
RefreshUserToGroupsMappingsRequest request)
throws StandbyException, YarnException, IOException {
return rmAdminProxy.refreshUserToGroupsMappings(request);
}
@Override
public RefreshAdminAclsResponse refreshAdminAcls(
RefreshAdminAclsRequest request) throws YarnException, IOException {
return rmAdminProxy.refreshAdminAcls(request);
}
@Override
public RefreshServiceAclsResponse refreshServiceAcls(
RefreshServiceAclsRequest request) throws YarnException, IOException {
return rmAdminProxy.refreshServiceAcls(request);
}
@Override
public UpdateNodeResourceResponse updateNodeResource(
UpdateNodeResourceRequest request) throws YarnException, IOException {
return rmAdminProxy.updateNodeResource(request);
}
@Override
public RefreshNodesResourcesResponse refreshNodesResources(
RefreshNodesResourcesRequest request) throws YarnException, IOException {
return rmAdminProxy.refreshNodesResources(request);
}
@Override
public AddToClusterNodeLabelsResponse addToClusterNodeLabels(
AddToClusterNodeLabelsRequest request) throws YarnException, IOException {
return rmAdminProxy.addToClusterNodeLabels(request);
}
@Override
public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
RemoveFromClusterNodeLabelsRequest request)
throws YarnException, IOException {
return rmAdminProxy.removeFromClusterNodeLabels(request);
}
@Override
public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
ReplaceLabelsOnNodeRequest request) throws YarnException, IOException {
return rmAdminProxy.replaceLabelsOnNode(request);
}
@Override
public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)
throws YarnException, IOException {
return rmAdminProxy
.checkForDecommissioningNodes(checkForDecommissioningNodesRequest);
}
@Override
public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
RefreshClusterMaxPriorityRequest request)
throws YarnException, IOException {
return rmAdminProxy.refreshClusterMaxPriority(request);
}
@Override
public String[] getGroupsForUser(String userName) throws IOException {
return rmAdminProxy.getGroupsForUser(userName);
}
}

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.rmadmin;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
/**
* Defines the contract to be implemented by the request intercepter classes,
* that can be used to intercept and inspect messages sent from the client to
* the resource manager.
*/
public interface RMAdminRequestInterceptor
extends ResourceManagerAdministrationProtocol, Configurable {
/**
* This method is called for initializing the intercepter. This is guaranteed
* to be called only once in the lifetime of this instance.
*
* @param user the name of the client
*/
void init(String user);
/**
* This method is called to release the resources held by the intercepter.
* This will be called when the application pipeline is being destroyed. The
* concrete implementations should dispose the resources and forward the
* request to the next intercepter, if any.
*/
void shutdown();
/**
* Sets the next intercepter in the pipeline. The concrete implementation of
* this interface should always pass the request to the nextInterceptor after
* inspecting the message. The last intercepter in the chain is responsible to
* send the messages to the resource manager service and so the last
* intercepter will not receive this method call.
*
* @param nextInterceptor the RMAdminRequestInterceptor to set in the pipeline
*/
void setNextInterceptor(RMAdminRequestInterceptor nextInterceptor);
/**
* Returns the next intercepter in the chain.
*
* @return the next intercepter in the chain
*/
RMAdminRequestInterceptor getNextInterceptor();
}

View File

@ -0,0 +1,423 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.rmadmin;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* RouterRMAdminService is a service that runs on each router that can be used
* to intercept and inspect {@code ResourceManagerAdministrationProtocol}
* messages from client to the cluster resource manager. It listens
* {@code ResourceManagerAdministrationProtocol} messages from the client and
* creates a request intercepting pipeline instance for each client. The
* pipeline is a chain of intercepter instances that can inspect and modify the
* request/response as needed. The main difference with AMRMProxyService is the
* protocol they implement.
*/
public class RouterRMAdminService extends AbstractService
implements ResourceManagerAdministrationProtocol {
private static final Logger LOG =
LoggerFactory.getLogger(RouterRMAdminService.class);
private Server server;
private InetSocketAddress listenerEndpoint;
// For each user we store an interceptors' pipeline.
// For performance issue we use LRU cache to keep in memory the newest ones
// and remove the oldest used ones.
private Map<String, RequestInterceptorChainWrapper> userPipelineMap;
public RouterRMAdminService() {
super(RouterRMAdminService.class.getName());
}
@Override
protected void serviceStart() throws Exception {
LOG.info("Starting Router RMAdmin Service");
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
UserGroupInformation.setConfiguration(conf);
this.listenerEndpoint =
conf.getSocketAddr(YarnConfiguration.ROUTER_BIND_HOST,
YarnConfiguration.ROUTER_RMADMIN_ADDRESS,
YarnConfiguration.DEFAULT_ROUTER_RMADMIN_ADDRESS,
YarnConfiguration.DEFAULT_ROUTER_RMADMIN_PORT);
int maxCacheSize =
conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE);
this.userPipelineMap = Collections.synchronizedMap(
new LRUCacheHashMap<String, RequestInterceptorChainWrapper>(
maxCacheSize, true));
Configuration serverConf = new Configuration(conf);
int numWorkerThreads =
serverConf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT);
this.server = rpc.getServer(ResourceManagerAdministrationProtocol.class,
this, listenerEndpoint, serverConf, null, numWorkerThreads);
this.server.start();
LOG.info("Router RMAdminService listening on address: "
+ this.server.getListenerAddress());
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
LOG.info("Stopping Router RMAdminService");
if (this.server != null) {
this.server.stop();
}
userPipelineMap.clear();
super.serviceStop();
}
/**
* Returns the comma separated intercepter class names from the configuration.
*
* @param conf
* @return the intercepter class names as an instance of ArrayList
*/
private List<String> getInterceptorClassNames(Configuration conf) {
String configuredInterceptorClassNames =
conf.get(YarnConfiguration.ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE,
YarnConfiguration.DEFAULT_ROUTER_RMADMIN_INTERCEPTOR_CLASS);
List<String> interceptorClassNames = new ArrayList<String>();
Collection<String> tempList =
StringUtils.getStringCollection(configuredInterceptorClassNames);
for (String item : tempList) {
interceptorClassNames.add(item.trim());
}
return interceptorClassNames;
}
private RequestInterceptorChainWrapper getInterceptorChain()
throws IOException {
String user = UserGroupInformation.getCurrentUser().getUserName();
if (!userPipelineMap.containsKey(user)) {
initializePipeline(user);
}
return userPipelineMap.get(user);
}
/**
* Gets the Request intercepter chains for all the users.
*
* @return the request intercepter chains.
*/
@VisibleForTesting
protected Map<String, RequestInterceptorChainWrapper> getPipelines() {
return this.userPipelineMap;
}
/**
* This method creates and returns reference of the first intercepter in the
* chain of request intercepter instances.
*
* @return the reference of the first intercepter in the chain
*/
@VisibleForTesting
protected RMAdminRequestInterceptor createRequestInterceptorChain() {
Configuration conf = getConfig();
List<String> interceptorClassNames = getInterceptorClassNames(conf);
RMAdminRequestInterceptor pipeline = null;
RMAdminRequestInterceptor current = null;
for (String interceptorClassName : interceptorClassNames) {
try {
Class<?> interceptorClass = conf.getClassByName(interceptorClassName);
if (RMAdminRequestInterceptor.class
.isAssignableFrom(interceptorClass)) {
RMAdminRequestInterceptor interceptorInstance =
(RMAdminRequestInterceptor) ReflectionUtils
.newInstance(interceptorClass, conf);
if (pipeline == null) {
pipeline = interceptorInstance;
current = interceptorInstance;
continue;
} else {
current.setNextInterceptor(interceptorInstance);
current = interceptorInstance;
}
} else {
throw new YarnRuntimeException(
"Class: " + interceptorClassName + " not instance of "
+ RMAdminRequestInterceptor.class.getCanonicalName());
}
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException(
"Could not instantiate RMAdminRequestInterceptor: "
+ interceptorClassName,
e);
}
}
if (pipeline == null) {
throw new YarnRuntimeException(
"RequestInterceptor pipeline is not configured in the system");
}
return pipeline;
}
/**
* Initializes the request intercepter pipeline for the specified user.
*
* @param user
*/
private void initializePipeline(String user) {
RequestInterceptorChainWrapper chainWrapper = null;
synchronized (this.userPipelineMap) {
if (this.userPipelineMap.containsKey(user)) {
LOG.info("Request to start an already existing user: {}"
+ " was received, so ignoring.", user);
return;
}
chainWrapper = new RequestInterceptorChainWrapper();
this.userPipelineMap.put(user, chainWrapper);
}
// We register the pipeline instance in the map first and then initialize it
// later because chain initialization can be expensive and we would like to
// release the lock as soon as possible to prevent other applications from
// blocking when one application's chain is initializing
LOG.info("Initializing request processing pipeline for the user: {}", user);
try {
RMAdminRequestInterceptor interceptorChain =
this.createRequestInterceptorChain();
interceptorChain.init(user);
chainWrapper.init(interceptorChain);
} catch (Exception e) {
synchronized (this.userPipelineMap) {
this.userPipelineMap.remove(user);
}
throw e;
}
}
/**
* Private structure for encapsulating RequestInterceptor and user instances.
*
*/
@Private
public static class RequestInterceptorChainWrapper {
private RMAdminRequestInterceptor rootInterceptor;
/**
* Initializes the wrapper with the specified parameters.
*
* @param interceptor the first interceptor in the pipeline
*/
public synchronized void init(RMAdminRequestInterceptor interceptor) {
this.rootInterceptor = interceptor;
}
/**
* Gets the root request intercepter.
*
* @return the root request intercepter
*/
public synchronized RMAdminRequestInterceptor getRootInterceptor() {
return rootInterceptor;
}
/**
* Shutdown the chain of interceptors when the object is destroyed.
*/
@Override
protected void finalize() {
rootInterceptor.shutdown();
}
}
@Override
public String[] getGroupsForUser(String user) throws IOException {
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().getGroupsForUser(user);
}
@Override
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
throws StandbyException, YarnException, IOException {
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().refreshQueues(request);
}
@Override
public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
throws StandbyException, YarnException, IOException {
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().refreshNodes(request);
}
@Override
public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
RefreshSuperUserGroupsConfigurationRequest request)
throws StandbyException, YarnException, IOException {
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor()
.refreshSuperUserGroupsConfiguration(request);
}
@Override
public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
RefreshUserToGroupsMappingsRequest request)
throws StandbyException, YarnException, IOException {
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().refreshUserToGroupsMappings(request);
}
@Override
public RefreshAdminAclsResponse refreshAdminAcls(
RefreshAdminAclsRequest request) throws YarnException, IOException {
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().refreshAdminAcls(request);
}
@Override
public RefreshServiceAclsResponse refreshServiceAcls(
RefreshServiceAclsRequest request) throws YarnException, IOException {
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().refreshServiceAcls(request);
}
@Override
public UpdateNodeResourceResponse updateNodeResource(
UpdateNodeResourceRequest request) throws YarnException, IOException {
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().updateNodeResource(request);
}
@Override
public RefreshNodesResourcesResponse refreshNodesResources(
RefreshNodesResourcesRequest request) throws YarnException, IOException {
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().refreshNodesResources(request);
}
@Override
public AddToClusterNodeLabelsResponse addToClusterNodeLabels(
AddToClusterNodeLabelsRequest request) throws YarnException, IOException {
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().addToClusterNodeLabels(request);
}
@Override
public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
RemoveFromClusterNodeLabelsRequest request)
throws YarnException, IOException {
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().removeFromClusterNodeLabels(request);
}
@Override
public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
ReplaceLabelsOnNodeRequest request) throws YarnException, IOException {
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().replaceLabelsOnNode(request);
}
@Override
public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)
throws YarnException, IOException {
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor()
.checkForDecommissioningNodes(checkForDecommissioningNodesRequest);
}
@Override
public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
RefreshClusterMaxPriorityRequest request)
throws YarnException, IOException {
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().refreshClusterMaxPriority(request);
}
}

View File

@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/** Router RM Admin Proxy Service package. **/
package org.apache.hadoop.yarn.server.router.rmadmin;

View File

@ -133,7 +133,7 @@ public abstract class BaseRouterClientRMTest {
+ "," + mockPassThroughInterceptorClass + "," + "," + mockPassThroughInterceptorClass + ","
+ MockClientRequestInterceptor.class.getName()); + MockClientRequestInterceptor.class.getName());
this.conf.setInt(YarnConfiguration.ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE, this.conf.setInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
TEST_MAX_CACHE_SIZE); TEST_MAX_CACHE_SIZE);
this.dispatcher = new AsyncDispatcher(); this.dispatcher = new AsyncDispatcher();

View File

@ -0,0 +1,346 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.rmadmin;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
/**
* Base class for all the RouterRMAdminService test cases. It provides utility
* methods that can be used by the concrete test case classes.
*
*/
public abstract class BaseRouterRMAdminTest {
/**
* The RouterRMAdminService instance that will be used by all the test cases.
*/
private MockRouterRMAdminService rmAdminService;
/**
* Thread pool used for asynchronous operations.
*/
private static ExecutorService threadpool = Executors.newCachedThreadPool();
private Configuration conf;
private AsyncDispatcher dispatcher;
public final static int TEST_MAX_CACHE_SIZE = 10;
protected MockRouterRMAdminService getRouterRMAdminService() {
Assert.assertNotNull(this.rmAdminService);
return this.rmAdminService;
}
@Before
public void setUp() {
this.conf = new YarnConfiguration();
String mockPassThroughInterceptorClass =
PassThroughRMAdminRequestInterceptor.class.getName();
// Create a request intercepter pipeline for testing. The last one in the
// chain will call the mock resource manager. The others in the chain will
// simply forward it to the next one in the chain
this.conf.set(YarnConfiguration.ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE,
mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
+ "," + mockPassThroughInterceptorClass + ","
+ MockRMAdminRequestInterceptor.class.getName());
this.conf.setInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
TEST_MAX_CACHE_SIZE);
this.dispatcher = new AsyncDispatcher();
this.dispatcher.init(conf);
this.dispatcher.start();
this.rmAdminService = createAndStartRouterRMAdminService();
}
@After
public void tearDown() {
if (rmAdminService != null) {
rmAdminService.stop();
rmAdminService = null;
}
if (this.dispatcher != null) {
this.dispatcher.stop();
}
}
protected ExecutorService getThreadPool() {
return threadpool;
}
protected MockRouterRMAdminService createAndStartRouterRMAdminService() {
MockRouterRMAdminService svc = new MockRouterRMAdminService();
svc.init(conf);
svc.start();
return svc;
}
protected static class MockRouterRMAdminService extends RouterRMAdminService {
public MockRouterRMAdminService() {
super();
}
}
protected RefreshQueuesResponse refreshQueues(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<RefreshQueuesResponse>() {
@Override
public RefreshQueuesResponse run() throws Exception {
RefreshQueuesRequest req = RefreshQueuesRequest.newInstance();
RefreshQueuesResponse response =
getRouterRMAdminService().refreshQueues(req);
return response;
}
});
}
protected RefreshNodesResponse refreshNodes(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<RefreshNodesResponse>() {
@Override
public RefreshNodesResponse run() throws Exception {
RefreshNodesRequest req = RefreshNodesRequest.newInstance();
RefreshNodesResponse response =
getRouterRMAdminService().refreshNodes(req);
return response;
}
});
}
protected RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
String user) throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user).doAs(
new PrivilegedExceptionAction<RefreshSuperUserGroupsConfigurationResponse>() {
@Override
public RefreshSuperUserGroupsConfigurationResponse run()
throws Exception {
RefreshSuperUserGroupsConfigurationRequest req =
RefreshSuperUserGroupsConfigurationRequest.newInstance();
RefreshSuperUserGroupsConfigurationResponse response =
getRouterRMAdminService()
.refreshSuperUserGroupsConfiguration(req);
return response;
}
});
}
protected RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
String user) throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user).doAs(
new PrivilegedExceptionAction<RefreshUserToGroupsMappingsResponse>() {
@Override
public RefreshUserToGroupsMappingsResponse run() throws Exception {
RefreshUserToGroupsMappingsRequest req =
RefreshUserToGroupsMappingsRequest.newInstance();
RefreshUserToGroupsMappingsResponse response =
getRouterRMAdminService().refreshUserToGroupsMappings(req);
return response;
}
});
}
protected RefreshAdminAclsResponse refreshAdminAcls(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<RefreshAdminAclsResponse>() {
@Override
public RefreshAdminAclsResponse run() throws Exception {
RefreshAdminAclsRequest req = RefreshAdminAclsRequest.newInstance();
RefreshAdminAclsResponse response =
getRouterRMAdminService().refreshAdminAcls(req);
return response;
}
});
}
protected RefreshServiceAclsResponse refreshServiceAcls(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<RefreshServiceAclsResponse>() {
@Override
public RefreshServiceAclsResponse run() throws Exception {
RefreshServiceAclsRequest req =
RefreshServiceAclsRequest.newInstance();
RefreshServiceAclsResponse response =
getRouterRMAdminService().refreshServiceAcls(req);
return response;
}
});
}
protected UpdateNodeResourceResponse updateNodeResource(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<UpdateNodeResourceResponse>() {
@Override
public UpdateNodeResourceResponse run() throws Exception {
UpdateNodeResourceRequest req =
UpdateNodeResourceRequest.newInstance(null);
UpdateNodeResourceResponse response =
getRouterRMAdminService().updateNodeResource(req);
return response;
}
});
}
protected RefreshNodesResourcesResponse refreshNodesResources(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<RefreshNodesResourcesResponse>() {
@Override
public RefreshNodesResourcesResponse run() throws Exception {
RefreshNodesResourcesRequest req =
RefreshNodesResourcesRequest.newInstance();
RefreshNodesResourcesResponse response =
getRouterRMAdminService().refreshNodesResources(req);
return response;
}
});
}
protected AddToClusterNodeLabelsResponse addToClusterNodeLabels(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<AddToClusterNodeLabelsResponse>() {
@Override
public AddToClusterNodeLabelsResponse run() throws Exception {
AddToClusterNodeLabelsRequest req =
AddToClusterNodeLabelsRequest.newInstance(null);
AddToClusterNodeLabelsResponse response =
getRouterRMAdminService().addToClusterNodeLabels(req);
return response;
}
});
}
protected RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
String user) throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user).doAs(
new PrivilegedExceptionAction<RemoveFromClusterNodeLabelsResponse>() {
@Override
public RemoveFromClusterNodeLabelsResponse run() throws Exception {
RemoveFromClusterNodeLabelsRequest req =
RemoveFromClusterNodeLabelsRequest.newInstance(null);
RemoveFromClusterNodeLabelsResponse response =
getRouterRMAdminService().removeFromClusterNodeLabels(req);
return response;
}
});
}
protected ReplaceLabelsOnNodeResponse replaceLabelsOnNode(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<ReplaceLabelsOnNodeResponse>() {
@Override
public ReplaceLabelsOnNodeResponse run() throws Exception {
ReplaceLabelsOnNodeRequest req = ReplaceLabelsOnNodeRequest
.newInstance(new HashMap<NodeId, Set<String>>());
ReplaceLabelsOnNodeResponse response =
getRouterRMAdminService().replaceLabelsOnNode(req);
return response;
}
});
}
protected CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
String user) throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user).doAs(
new PrivilegedExceptionAction<CheckForDecommissioningNodesResponse>() {
@Override
public CheckForDecommissioningNodesResponse run() throws Exception {
CheckForDecommissioningNodesRequest req =
CheckForDecommissioningNodesRequest.newInstance();
CheckForDecommissioningNodesResponse response =
getRouterRMAdminService().checkForDecommissioningNodes(req);
return response;
}
});
}
protected RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
String user) throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user).doAs(
new PrivilegedExceptionAction<RefreshClusterMaxPriorityResponse>() {
@Override
public RefreshClusterMaxPriorityResponse run() throws Exception {
RefreshClusterMaxPriorityRequest req =
RefreshClusterMaxPriorityRequest.newInstance();
RefreshClusterMaxPriorityResponse response =
getRouterRMAdminService().refreshClusterMaxPriority(req);
return response;
}
});
}
protected String[] getGroupsForUser(String user)
throws IOException, InterruptedException {
return UserGroupInformation.createRemoteUser(user)
.doAs(new PrivilegedExceptionAction<String[]>() {
@Override
public String[] run() throws Exception {
String[] response =
getRouterRMAdminService().getGroupsForUser(user);
return response;
}
});
}
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.rmadmin;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
/**
* This class mocks the RMAmdinRequestInterceptor.
*/
public class MockRMAdminRequestInterceptor
extends DefaultRMAdminRequestInterceptor {
public void init(String user) {
MockResourceManagerFacade mockRM = new MockResourceManagerFacade(
new YarnConfiguration(super.getConf()), 0);
super.setRMAdmin(mockRM);
}
}

View File

@ -0,0 +1,148 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.rmadmin;
import java.io.IOException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
/**
* Mock intercepter that does not do anything other than forwarding it to the
* next intercepter in the chain.
*/
public class PassThroughRMAdminRequestInterceptor
extends AbstractRMAdminRequestInterceptor {
@Override
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
throws StandbyException, YarnException, IOException {
return getNextInterceptor().refreshQueues(request);
}
@Override
public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
throws StandbyException, YarnException, IOException {
return getNextInterceptor().refreshNodes(request);
}
@Override
public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
RefreshSuperUserGroupsConfigurationRequest request)
throws StandbyException, YarnException, IOException {
return getNextInterceptor().refreshSuperUserGroupsConfiguration(request);
}
@Override
public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
RefreshUserToGroupsMappingsRequest request)
throws StandbyException, YarnException, IOException {
return getNextInterceptor().refreshUserToGroupsMappings(request);
}
@Override
public RefreshAdminAclsResponse refreshAdminAcls(
RefreshAdminAclsRequest request) throws YarnException, IOException {
return getNextInterceptor().refreshAdminAcls(request);
}
@Override
public RefreshServiceAclsResponse refreshServiceAcls(
RefreshServiceAclsRequest request) throws YarnException, IOException {
return getNextInterceptor().refreshServiceAcls(request);
}
@Override
public UpdateNodeResourceResponse updateNodeResource(
UpdateNodeResourceRequest request) throws YarnException, IOException {
return getNextInterceptor().updateNodeResource(request);
}
@Override
public RefreshNodesResourcesResponse refreshNodesResources(
RefreshNodesResourcesRequest request) throws YarnException, IOException {
return getNextInterceptor().refreshNodesResources(request);
}
@Override
public AddToClusterNodeLabelsResponse addToClusterNodeLabels(
AddToClusterNodeLabelsRequest request) throws YarnException, IOException {
return getNextInterceptor().addToClusterNodeLabels(request);
}
@Override
public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
RemoveFromClusterNodeLabelsRequest request)
throws YarnException, IOException {
return getNextInterceptor().removeFromClusterNodeLabels(request);
}
@Override
public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
ReplaceLabelsOnNodeRequest request) throws YarnException, IOException {
return getNextInterceptor().replaceLabelsOnNode(request);
}
@Override
public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)
throws YarnException, IOException {
return getNextInterceptor()
.checkForDecommissioningNodes(checkForDecommissioningNodesRequest);
}
@Override
public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
RefreshClusterMaxPriorityRequest request)
throws YarnException, IOException {
return getNextInterceptor().refreshClusterMaxPriority(request);
}
@Override
public String[] getGroupsForUser(String user) throws IOException {
return getNextInterceptor().getGroupsForUser(user);
}
}

View File

@ -0,0 +1,219 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.rmadmin;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
import org.apache.hadoop.yarn.server.router.rmadmin.RouterRMAdminService.RequestInterceptorChainWrapper;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test class to validate the RMAdmin Service inside the Router.
*/
public class TestRouterRMAdminService extends BaseRouterRMAdminTest {
private static final Logger LOG =
LoggerFactory.getLogger(TestRouterRMAdminService.class);
/**
* Tests if the pipeline is created properly.
*/
@Test
public void testRequestInterceptorChainCreation() throws Exception {
RMAdminRequestInterceptor root =
super.getRouterRMAdminService().createRequestInterceptorChain();
int index = 0;
while (root != null) {
// The current pipeline is:
// PassThroughRMAdminRequestInterceptor - index = 0
// PassThroughRMAdminRequestInterceptor - index = 1
// PassThroughRMAdminRequestInterceptor - index = 2
// MockClientRequestInterceptor - index = 3
switch (index) {
case 0: // Fall to the next case
case 1: // Fall to the next case
case 2:
// If index is equal to 0,1 or 2 we fall in this check
Assert.assertEquals(
PassThroughRMAdminRequestInterceptor.class.getName(),
root.getClass().getName());
break;
case 3:
Assert.assertEquals(MockRMAdminRequestInterceptor.class.getName(),
root.getClass().getName());
break;
default:
Assert.fail();
}
root = root.getNextInterceptor();
index++;
}
Assert.assertEquals("The number of interceptors in chain does not match", 4,
index);
}
/**
* Test if the RouterRMAdmin forwards all the requests to the MockRM and get
* back the responses.
*/
@Test
public void testRouterRMAdminServiceE2E() throws Exception {
String user = "test1";
LOG.info("testRouterRMAdminServiceE2E - Refresh Queues");
RefreshQueuesResponse responseRefreshQueues = refreshQueues(user);
Assert.assertNotNull(responseRefreshQueues);
LOG.info("testRouterRMAdminServiceE2E - Refresh Nodes");
RefreshNodesResponse responseRefreshNodes = refreshNodes(user);
Assert.assertNotNull(responseRefreshNodes);
LOG.info("testRouterRMAdminServiceE2E - Refresh Super User");
RefreshSuperUserGroupsConfigurationResponse responseRefreshSuperUser =
refreshSuperUserGroupsConfiguration(user);
Assert.assertNotNull(responseRefreshSuperUser);
LOG.info("testRouterRMAdminServiceE2E - Refresh User to Group");
RefreshUserToGroupsMappingsResponse responseRefreshUserToGroup =
refreshUserToGroupsMappings(user);
Assert.assertNotNull(responseRefreshUserToGroup);
LOG.info("testRouterRMAdminServiceE2E - Refresh Admin Acls");
RefreshAdminAclsResponse responseRefreshAdminAcls = refreshAdminAcls(user);
Assert.assertNotNull(responseRefreshAdminAcls);
LOG.info("testRouterRMAdminServiceE2E - Refresh Service Acls");
RefreshServiceAclsResponse responseRefreshServiceAcls =
refreshServiceAcls(user);
Assert.assertNotNull(responseRefreshServiceAcls);
LOG.info("testRouterRMAdminServiceE2E - Update Node Resource");
UpdateNodeResourceResponse responseUpdateNodeResource =
updateNodeResource(user);
Assert.assertNotNull(responseUpdateNodeResource);
LOG.info("testRouterRMAdminServiceE2E - Refresh Nodes Resource");
RefreshNodesResourcesResponse responseRefreshNodesResources =
refreshNodesResources(user);
Assert.assertNotNull(responseRefreshNodesResources);
LOG.info("testRouterRMAdminServiceE2E - Add To Cluster NodeLabels");
AddToClusterNodeLabelsResponse responseAddToClusterNodeLabels =
addToClusterNodeLabels(user);
Assert.assertNotNull(responseAddToClusterNodeLabels);
LOG.info("testRouterRMAdminServiceE2E - Remove To Cluster NodeLabels");
RemoveFromClusterNodeLabelsResponse responseRemoveFromClusterNodeLabels =
removeFromClusterNodeLabels(user);
Assert.assertNotNull(responseRemoveFromClusterNodeLabels);
LOG.info("testRouterRMAdminServiceE2E - Replace Labels On Node");
ReplaceLabelsOnNodeResponse responseReplaceLabelsOnNode =
replaceLabelsOnNode(user);
Assert.assertNotNull(responseReplaceLabelsOnNode);
LOG.info("testRouterRMAdminServiceE2E - Check For Decommissioning Nodes");
CheckForDecommissioningNodesResponse responseCheckForDecom =
checkForDecommissioningNodes(user);
Assert.assertNotNull(responseCheckForDecom);
LOG.info("testRouterRMAdminServiceE2E - Refresh Cluster Max Priority");
RefreshClusterMaxPriorityResponse responseRefreshClusterMaxPriority =
refreshClusterMaxPriority(user);
Assert.assertNotNull(responseRefreshClusterMaxPriority);
LOG.info("testRouterRMAdminServiceE2E - Get Groups For User");
String[] responseGetGroupsForUser = getGroupsForUser(user);
Assert.assertNotNull(responseGetGroupsForUser);
}
/**
* Test if the different chains for users are generated, and LRU cache is
* working as expected.
*/
@Test
public void testUsersChainMapWithLRUCache()
throws YarnException, IOException, InterruptedException {
Map<String, RequestInterceptorChainWrapper> pipelines;
RequestInterceptorChainWrapper chain;
refreshQueues("test1");
refreshQueues("test2");
refreshQueues("test3");
refreshQueues("test4");
refreshQueues("test5");
refreshQueues("test6");
refreshQueues("test7");
refreshQueues("test8");
pipelines = super.getRouterRMAdminService().getPipelines();
Assert.assertEquals(8, pipelines.size());
refreshQueues("test9");
refreshQueues("test10");
refreshQueues("test1");
refreshQueues("test11");
// The cache max size is defined in
// BaseRouterClientRMTest.TEST_MAX_CACHE_SIZE
Assert.assertEquals(10, pipelines.size());
chain = pipelines.get("test1");
Assert.assertNotNull("test1 should not be evicted", chain);
chain = pipelines.get("test2");
Assert.assertNull("test2 should have been evicted", chain);
}
}