From 7444406d6d48846d06b04e82587885bb8632a16c Mon Sep 17 00:00:00 2001 From: Subru Krishnan Date: Tue, 9 May 2017 19:19:27 -0700 Subject: [PATCH] YARN-5413. Create a proxy chain for ResourceManager Admin API in the Router. (Giovanni Matteo Fumarola via Subru). (cherry picked from commit 67846a5519b5905c2d925cf4c602f715b653e72c) --- .../hadoop/yarn/conf/YarnConfiguration.java | 22 +- .../src/main/resources/yarn-default.xml | 25 +- .../hadoop/yarn/util/TestLRUCacheHashMap.java | 2 +- .../server/MockResourceManagerFacade.java | 120 ++++- .../hadoop/yarn/server/router/Router.java | 10 + .../AbstractClientRequestInterceptor.java | 11 +- .../DefaultClientRequestInterceptor.java | 2 +- .../clientrm/RouterClientRMService.java | 16 +- .../AbstractRMAdminRequestInterceptor.java | 90 ++++ .../DefaultRMAdminRequestInterceptor.java | 215 +++++++++ .../rmadmin/RMAdminRequestInterceptor.java | 65 +++ .../router/rmadmin/RouterRMAdminService.java | 423 ++++++++++++++++++ .../server/router/rmadmin/package-info.java | 20 + .../clientrm/BaseRouterClientRMTest.java | 2 +- .../router/rmadmin/BaseRouterRMAdminTest.java | 346 ++++++++++++++ .../MockRMAdminRequestInterceptor.java | 36 ++ .../PassThroughRMAdminRequestInterceptor.java | 148 ++++++ .../rmadmin/TestRouterRMAdminService.java | 219 +++++++++ 18 files changed, 1750 insertions(+), 22 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/AbstractRMAdminRequestInterceptor.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RMAdminRequestInterceptor.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/package-info.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/BaseRouterRMAdminTest.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/MockRMAdminRequestInterceptor.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestRouterRMAdminService.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index cf9c2379972..1432867ab6e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2639,6 +2639,8 @@ public class YarnConfiguration extends Configuration { public static final String ROUTER_PREFIX = YARN_PREFIX + "router."; + public static final String ROUTER_BIND_HOST = ROUTER_PREFIX + "bind-host"; + public static final String ROUTER_CLIENTRM_PREFIX = ROUTER_PREFIX + "clientrm."; @@ -2654,9 +2656,23 @@ public class YarnConfiguration extends Configuration { "org.apache.hadoop.yarn.server.router.clientrm." + "DefaultClientRequestInterceptor"; - public static final String ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE = - ROUTER_CLIENTRM_PREFIX + "cache-max-size"; - public static final int DEFAULT_ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE = 25; + public static final String ROUTER_PIPELINE_CACHE_MAX_SIZE = + ROUTER_PREFIX + "pipeline.cache-max-size"; + public static final int DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE = 25; + + public static final String ROUTER_RMADMIN_PREFIX = ROUTER_PREFIX + "rmadmin."; + + public static final String ROUTER_RMADMIN_ADDRESS = + ROUTER_RMADMIN_PREFIX + ".address"; + public static final int DEFAULT_ROUTER_RMADMIN_PORT = 8052; + public static final String DEFAULT_ROUTER_RMADMIN_ADDRESS = + "0.0.0.0:" + DEFAULT_ROUTER_RMADMIN_PORT; + + public static final String ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE = + ROUTER_RMADMIN_PREFIX + "interceptor-class.pipeline"; + public static final String DEFAULT_ROUTER_RMADMIN_INTERCEPTOR_CLASS = + "org.apache.hadoop.yarn.server.router.rmadmin." + + "DefaultRMAdminRequestInterceptor"; //////////////////////////////// // Other Configs diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 94dccd1fc58..82193257e59 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3177,12 +3177,33 @@ - Size of LRU cache for Router ClientRM Service. + Size of LRU cache for Router ClientRM Service and RMAdmin Service. - yarn.router.clientrm.cache-max-size + yarn.router.pipeline.cache-max-size 25 + + + The comma separated list of class names that implement the + RequestInterceptor interface. This is used by the RouterRMAdminService + to create the request processing pipeline for users. + + yarn.router.rmadmin.interceptor-class.pipeline + org.apache.hadoop.yarn.server.router.rmadmin.DefaultRMAdminRequestInterceptor + + + + + The actual address the server will bind to. If this optional address is + set, the RPC and webapp servers will bind to this address and the port specified in + yarn.router.address and yarn.router.webapp.address, respectively. This is + most useful for making Router listen to all interfaces by setting to 0.0.0.0. + + yarn.router.bind-host + + + Comma-separated list of PlacementRules to determine how applications diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLRUCacheHashMap.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLRUCacheHashMap.java index 1cbb56c7326..9d3ec32975a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLRUCacheHashMap.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLRUCacheHashMap.java @@ -24,7 +24,7 @@ import org.junit.Assert; import org.junit.Test; /** - * Test class to validate the correctness of the LRUCacheHashMap. + * Test class to validate the correctness of the {@code LRUCacheHashMap}. * */ public class TestLRUCacheHashMap { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java index e302c70be33..696188be7ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java @@ -26,6 +26,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; @@ -117,6 +118,33 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.slf4j.Logger; @@ -130,8 +158,8 @@ import com.google.common.base.Strings; * implementation is expected by the Router/AMRMProxy unit test cases. So please * change the implementation with care. */ -public class MockResourceManagerFacade - implements ApplicationClientProtocol, ApplicationMasterProtocol { +public class MockResourceManagerFacade implements ApplicationClientProtocol, + ApplicationMasterProtocol, ResourceManagerAdministrationProtocol { private static final Logger LOG = LoggerFactory.getLogger(MockResourceManagerFacade.class); @@ -508,4 +536,92 @@ public class MockResourceManagerFacade throws YarnException, IOException { return UpdateApplicationTimeoutsResponse.newInstance(); } + + @Override + public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) + throws StandbyException, YarnException, IOException { + return RefreshQueuesResponse.newInstance(); + } + + @Override + public RefreshNodesResponse refreshNodes(RefreshNodesRequest request) + throws StandbyException, YarnException, IOException { + return RefreshNodesResponse.newInstance(); + } + + @Override + public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration( + RefreshSuperUserGroupsConfigurationRequest request) + throws StandbyException, YarnException, IOException { + return RefreshSuperUserGroupsConfigurationResponse.newInstance(); + } + + @Override + public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings( + RefreshUserToGroupsMappingsRequest request) + throws StandbyException, YarnException, IOException { + return RefreshUserToGroupsMappingsResponse.newInstance(); + } + + @Override + public RefreshAdminAclsResponse refreshAdminAcls( + RefreshAdminAclsRequest request) throws YarnException, IOException { + return RefreshAdminAclsResponse.newInstance(); + } + + @Override + public RefreshServiceAclsResponse refreshServiceAcls( + RefreshServiceAclsRequest request) throws YarnException, IOException { + return RefreshServiceAclsResponse.newInstance(); + } + + @Override + public UpdateNodeResourceResponse updateNodeResource( + UpdateNodeResourceRequest request) throws YarnException, IOException { + return UpdateNodeResourceResponse.newInstance(); + } + + @Override + public RefreshNodesResourcesResponse refreshNodesResources( + RefreshNodesResourcesRequest request) throws YarnException, IOException { + return RefreshNodesResourcesResponse.newInstance(); + } + + @Override + public AddToClusterNodeLabelsResponse addToClusterNodeLabels( + AddToClusterNodeLabelsRequest request) throws YarnException, IOException { + return AddToClusterNodeLabelsResponse.newInstance(); + } + + @Override + public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels( + RemoveFromClusterNodeLabelsRequest request) + throws YarnException, IOException { + return RemoveFromClusterNodeLabelsResponse.newInstance(); + } + + @Override + public ReplaceLabelsOnNodeResponse replaceLabelsOnNode( + ReplaceLabelsOnNodeRequest request) throws YarnException, IOException { + return ReplaceLabelsOnNodeResponse.newInstance(); + } + + @Override + public CheckForDecommissioningNodesResponse checkForDecommissioningNodes( + CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest) + throws YarnException, IOException { + return CheckForDecommissioningNodesResponse.newInstance(null); + } + + @Override + public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority( + RefreshClusterMaxPriorityRequest request) + throws YarnException, IOException { + return RefreshClusterMaxPriorityResponse.newInstance(); + } + + @Override + public String[] getGroupsForUser(String user) throws IOException { + return new String[0]; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java index 7cfabf5af76..d2eee5a45fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService; +import org.apache.hadoop.yarn.server.router.rmadmin.RouterRMAdminService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +55,7 @@ public class Router extends CompositeService { private Configuration conf; private AtomicBoolean isStopping = new AtomicBoolean(false); private RouterClientRMService clientRMProxyService; + private RouterRMAdminService rmAdminProxyService; /** * Priority of the Router shutdown hook. @@ -71,8 +73,12 @@ public class Router extends CompositeService { @Override protected void serviceInit(Configuration config) throws Exception { this.conf = config; + // ClientRM Proxy clientRMProxyService = createClientRMProxyService(); addService(clientRMProxyService); + // RMAdmin Proxy + rmAdminProxyService = createRMAdminProxyService(); + addService(rmAdminProxyService); super.serviceInit(conf); } @@ -107,6 +113,10 @@ public class Router extends CompositeService { return new RouterClientRMService(); } + protected RouterRMAdminService createRMAdminProxyService() { + return new RouterRMAdminService(); + } + public static void main(String[] argv) { Configuration conf = new YarnConfiguration(); Thread diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java index fc6a118da24..5980b03a66b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java @@ -21,8 +21,9 @@ package org.apache.hadoop.yarn.server.router.clientrm; import org.apache.hadoop.conf.Configuration; /** - * Implements the RequestInterceptor interface and provides common functionality - * which can can be used and/or extended by other concrete intercepter classes. + * Implements the {@link ClientRequestInterceptor} interface and provides common + * functionality which can can be used and/or extended by other concrete + * intercepter classes. * */ public abstract class AbstractClientRequestInterceptor @@ -31,7 +32,7 @@ public abstract class AbstractClientRequestInterceptor private ClientRequestInterceptor nextInterceptor; /** - * Sets the {@code RequestInterceptor} in the chain. + * Sets the {@link ClientRequestInterceptor} in the chain. */ @Override public void setNextInterceptor(ClientRequestInterceptor nextInterceptor) { @@ -59,7 +60,7 @@ public abstract class AbstractClientRequestInterceptor } /** - * Initializes the {@code ClientRequestInterceptor}. + * Initializes the {@link ClientRequestInterceptor}. */ @Override public void init(String user) { @@ -69,7 +70,7 @@ public abstract class AbstractClientRequestInterceptor } /** - * Disposes the {@code ClientRequestInterceptor}. + * Disposes the {@link ClientRequestInterceptor}. */ @Override public void shutdown() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java index 12b933b8488..9e2bfed9543 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java @@ -91,7 +91,7 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; /** - * Extends the AbstractRequestInterceptorClient class and provides an + * Extends the {@code AbstractRequestInterceptorClient} class and provides an * implementation that simply forwards the client requests to the cluster * resource manager. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java index 00016dd7aed..fd2c610c7fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java @@ -104,10 +104,11 @@ import com.google.common.annotations.VisibleForTesting; /** * RouterClientRMService is a service that runs on each router that can be used - * to intercept and inspect ApplicationClientProtocol messages from client to - * the cluster resource manager. It listens ApplicationClientProtocol messages - * from the client and creates a request intercepting pipeline instance for each - * client. The pipeline is a chain of intercepter instances that can inspect and + * to intercept and inspect {@link ApplicationClientProtocol} messages from + * client to the cluster resource manager. It listens + * {@link ApplicationClientProtocol} messages from the client and creates a + * request intercepting pipeline instance for each client. The pipeline is a + * chain of {@link ClientRequestInterceptor} instances that can inspect and * modify the request/response as needed. The main difference with * AMRMProxyService is the protocol they implement. */ @@ -137,13 +138,14 @@ public class RouterClientRMService extends AbstractService UserGroupInformation.setConfiguration(conf); this.listenerEndpoint = - conf.getSocketAddr(YarnConfiguration.ROUTER_CLIENTRM_ADDRESS, + conf.getSocketAddr(YarnConfiguration.ROUTER_BIND_HOST, + YarnConfiguration.ROUTER_CLIENTRM_ADDRESS, YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_ADDRESS, YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PORT); int maxCacheSize = - conf.getInt(YarnConfiguration.ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE, - YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE); + conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE, + YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE); this.userPipelineMap = Collections.synchronizedMap( new LRUCacheHashMap( maxCacheSize, true)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/AbstractRMAdminRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/AbstractRMAdminRequestInterceptor.java new file mode 100644 index 00000000000..a4972fcb9ad --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/AbstractRMAdminRequestInterceptor.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.rmadmin; + +import org.apache.hadoop.conf.Configuration; + +/** + * Implements the {@link RMAdminRequestInterceptor} interface and provides + * common functionality which can can be used and/or extended by other concrete + * intercepter classes. + * + */ +public abstract class AbstractRMAdminRequestInterceptor + implements RMAdminRequestInterceptor { + private Configuration conf; + private RMAdminRequestInterceptor nextInterceptor; + + /** + * Sets the {@link RMAdminRequestInterceptor} in the chain. + */ + @Override + public void setNextInterceptor(RMAdminRequestInterceptor nextInterceptor) { + this.nextInterceptor = nextInterceptor; + } + + /** + * Sets the {@link Configuration}. + */ + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + if (this.nextInterceptor != null) { + this.nextInterceptor.setConf(conf); + } + } + + /** + * Gets the {@link Configuration}. + */ + @Override + public Configuration getConf() { + return this.conf; + } + + /** + * Initializes the {@link RMAdminRequestInterceptor}. + */ + @Override + public void init(String user) { + if (this.nextInterceptor != null) { + this.nextInterceptor.init(user); + } + } + + /** + * Disposes the {@link RMAdminRequestInterceptor}. + */ + @Override + public void shutdown() { + if (this.nextInterceptor != null) { + this.nextInterceptor.shutdown(); + } + } + + /** + * Gets the next {@link RMAdminRequestInterceptor} in the chain. + */ + @Override + public RMAdminRequestInterceptor getNextInterceptor() { + return this.nextInterceptor; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java new file mode 100644 index 00000000000..7e6a1ff28d3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.rmadmin; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.StandbyException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Extends the {@link AbstractRMAdminRequestInterceptor} class and provides an + * implementation that simply forwards the client requests to the cluster + * resource manager. + * + */ +public class DefaultRMAdminRequestInterceptor + extends AbstractRMAdminRequestInterceptor { + private static final Logger LOG = + LoggerFactory.getLogger(DefaultRMAdminRequestInterceptor.class); + private ResourceManagerAdministrationProtocol rmAdminProxy; + private UserGroupInformation user = null; + + @Override + public void init(String userName) { + super.init(userName); + try { + // Do not create a proxy user if user name matches the user name on + // current UGI + if (userName.equalsIgnoreCase( + UserGroupInformation.getCurrentUser().getUserName())) { + user = UserGroupInformation.getCurrentUser(); + } else { + user = UserGroupInformation.createProxyUser(userName, + UserGroupInformation.getCurrentUser()); + } + + final Configuration conf = this.getConf(); + + rmAdminProxy = user.doAs( + new PrivilegedExceptionAction() { + @Override + public ResourceManagerAdministrationProtocol run() + throws Exception { + return ClientRMProxy.createRMProxy(conf, + ResourceManagerAdministrationProtocol.class); + } + }); + } catch (IOException e) { + String message = "Error while creating Router RMAdmin Service for user:"; + if (user != null) { + message += ", user: " + user; + } + + LOG.info(message); + throw new YarnRuntimeException(message, e); + } catch (Exception e) { + throw new YarnRuntimeException(e); + } + } + + @Override + public void setNextInterceptor(RMAdminRequestInterceptor next) { + throw new YarnRuntimeException("setNextInterceptor is being called on " + + "DefaultRMAdminRequestInterceptor, which should be the last one " + + "in the chain. Check if the interceptor pipeline configuration " + + "is correct"); + } + + @VisibleForTesting + public void setRMAdmin(ResourceManagerAdministrationProtocol rmAdmin) { + this.rmAdminProxy = rmAdmin; + + } + + @Override + public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) + throws StandbyException, YarnException, IOException { + return rmAdminProxy.refreshQueues(request); + } + + @Override + public RefreshNodesResponse refreshNodes(RefreshNodesRequest request) + throws StandbyException, YarnException, IOException { + return rmAdminProxy.refreshNodes(request); + } + + @Override + public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration( + RefreshSuperUserGroupsConfigurationRequest request) + throws StandbyException, YarnException, IOException { + return rmAdminProxy.refreshSuperUserGroupsConfiguration(request); + } + + @Override + public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings( + RefreshUserToGroupsMappingsRequest request) + throws StandbyException, YarnException, IOException { + return rmAdminProxy.refreshUserToGroupsMappings(request); + } + + @Override + public RefreshAdminAclsResponse refreshAdminAcls( + RefreshAdminAclsRequest request) throws YarnException, IOException { + return rmAdminProxy.refreshAdminAcls(request); + } + + @Override + public RefreshServiceAclsResponse refreshServiceAcls( + RefreshServiceAclsRequest request) throws YarnException, IOException { + return rmAdminProxy.refreshServiceAcls(request); + } + + @Override + public UpdateNodeResourceResponse updateNodeResource( + UpdateNodeResourceRequest request) throws YarnException, IOException { + return rmAdminProxy.updateNodeResource(request); + } + + @Override + public RefreshNodesResourcesResponse refreshNodesResources( + RefreshNodesResourcesRequest request) throws YarnException, IOException { + return rmAdminProxy.refreshNodesResources(request); + } + + @Override + public AddToClusterNodeLabelsResponse addToClusterNodeLabels( + AddToClusterNodeLabelsRequest request) throws YarnException, IOException { + return rmAdminProxy.addToClusterNodeLabels(request); + } + + @Override + public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels( + RemoveFromClusterNodeLabelsRequest request) + throws YarnException, IOException { + return rmAdminProxy.removeFromClusterNodeLabels(request); + } + + @Override + public ReplaceLabelsOnNodeResponse replaceLabelsOnNode( + ReplaceLabelsOnNodeRequest request) throws YarnException, IOException { + return rmAdminProxy.replaceLabelsOnNode(request); + } + + @Override + public CheckForDecommissioningNodesResponse checkForDecommissioningNodes( + CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest) + throws YarnException, IOException { + return rmAdminProxy + .checkForDecommissioningNodes(checkForDecommissioningNodesRequest); + } + + @Override + public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority( + RefreshClusterMaxPriorityRequest request) + throws YarnException, IOException { + return rmAdminProxy.refreshClusterMaxPriority(request); + } + + @Override + public String[] getGroupsForUser(String userName) throws IOException { + return rmAdminProxy.getGroupsForUser(userName); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RMAdminRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RMAdminRequestInterceptor.java new file mode 100644 index 00000000000..dc4bda01b90 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RMAdminRequestInterceptor.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.rmadmin; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; + +/** + * Defines the contract to be implemented by the request intercepter classes, + * that can be used to intercept and inspect messages sent from the client to + * the resource manager. + */ +public interface RMAdminRequestInterceptor + extends ResourceManagerAdministrationProtocol, Configurable { + /** + * This method is called for initializing the intercepter. This is guaranteed + * to be called only once in the lifetime of this instance. + * + * @param user the name of the client + */ + void init(String user); + + /** + * This method is called to release the resources held by the intercepter. + * This will be called when the application pipeline is being destroyed. The + * concrete implementations should dispose the resources and forward the + * request to the next intercepter, if any. + */ + void shutdown(); + + /** + * Sets the next intercepter in the pipeline. The concrete implementation of + * this interface should always pass the request to the nextInterceptor after + * inspecting the message. The last intercepter in the chain is responsible to + * send the messages to the resource manager service and so the last + * intercepter will not receive this method call. + * + * @param nextInterceptor the RMAdminRequestInterceptor to set in the pipeline + */ + void setNextInterceptor(RMAdminRequestInterceptor nextInterceptor); + + /** + * Returns the next intercepter in the chain. + * + * @return the next intercepter in the chain + */ + RMAdminRequestInterceptor getNextInterceptor(); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java new file mode 100644 index 00000000000..b8b7ad818f3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java @@ -0,0 +1,423 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.rmadmin; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.ipc.StandbyException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; +import org.apache.hadoop.yarn.util.LRUCacheHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * RouterRMAdminService is a service that runs on each router that can be used + * to intercept and inspect {@code ResourceManagerAdministrationProtocol} + * messages from client to the cluster resource manager. It listens + * {@code ResourceManagerAdministrationProtocol} messages from the client and + * creates a request intercepting pipeline instance for each client. The + * pipeline is a chain of intercepter instances that can inspect and modify the + * request/response as needed. The main difference with AMRMProxyService is the + * protocol they implement. + */ +public class RouterRMAdminService extends AbstractService + implements ResourceManagerAdministrationProtocol { + + private static final Logger LOG = + LoggerFactory.getLogger(RouterRMAdminService.class); + + private Server server; + private InetSocketAddress listenerEndpoint; + + // For each user we store an interceptors' pipeline. + // For performance issue we use LRU cache to keep in memory the newest ones + // and remove the oldest used ones. + private Map userPipelineMap; + + public RouterRMAdminService() { + super(RouterRMAdminService.class.getName()); + } + + @Override + protected void serviceStart() throws Exception { + LOG.info("Starting Router RMAdmin Service"); + Configuration conf = getConfig(); + YarnRPC rpc = YarnRPC.create(conf); + UserGroupInformation.setConfiguration(conf); + + this.listenerEndpoint = + conf.getSocketAddr(YarnConfiguration.ROUTER_BIND_HOST, + YarnConfiguration.ROUTER_RMADMIN_ADDRESS, + YarnConfiguration.DEFAULT_ROUTER_RMADMIN_ADDRESS, + YarnConfiguration.DEFAULT_ROUTER_RMADMIN_PORT); + + int maxCacheSize = + conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE, + YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE); + this.userPipelineMap = Collections.synchronizedMap( + new LRUCacheHashMap( + maxCacheSize, true)); + + Configuration serverConf = new Configuration(conf); + + int numWorkerThreads = + serverConf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT, + YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT); + + this.server = rpc.getServer(ResourceManagerAdministrationProtocol.class, + this, listenerEndpoint, serverConf, null, numWorkerThreads); + + this.server.start(); + LOG.info("Router RMAdminService listening on address: " + + this.server.getListenerAddress()); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + LOG.info("Stopping Router RMAdminService"); + if (this.server != null) { + this.server.stop(); + } + userPipelineMap.clear(); + super.serviceStop(); + } + + /** + * Returns the comma separated intercepter class names from the configuration. + * + * @param conf + * @return the intercepter class names as an instance of ArrayList + */ + private List getInterceptorClassNames(Configuration conf) { + String configuredInterceptorClassNames = + conf.get(YarnConfiguration.ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE, + YarnConfiguration.DEFAULT_ROUTER_RMADMIN_INTERCEPTOR_CLASS); + + List interceptorClassNames = new ArrayList(); + Collection tempList = + StringUtils.getStringCollection(configuredInterceptorClassNames); + for (String item : tempList) { + interceptorClassNames.add(item.trim()); + } + + return interceptorClassNames; + } + + private RequestInterceptorChainWrapper getInterceptorChain() + throws IOException { + String user = UserGroupInformation.getCurrentUser().getUserName(); + if (!userPipelineMap.containsKey(user)) { + initializePipeline(user); + } + return userPipelineMap.get(user); + } + + /** + * Gets the Request intercepter chains for all the users. + * + * @return the request intercepter chains. + */ + @VisibleForTesting + protected Map getPipelines() { + return this.userPipelineMap; + } + + /** + * This method creates and returns reference of the first intercepter in the + * chain of request intercepter instances. + * + * @return the reference of the first intercepter in the chain + */ + @VisibleForTesting + protected RMAdminRequestInterceptor createRequestInterceptorChain() { + Configuration conf = getConfig(); + + List interceptorClassNames = getInterceptorClassNames(conf); + + RMAdminRequestInterceptor pipeline = null; + RMAdminRequestInterceptor current = null; + for (String interceptorClassName : interceptorClassNames) { + try { + Class interceptorClass = conf.getClassByName(interceptorClassName); + if (RMAdminRequestInterceptor.class + .isAssignableFrom(interceptorClass)) { + RMAdminRequestInterceptor interceptorInstance = + (RMAdminRequestInterceptor) ReflectionUtils + .newInstance(interceptorClass, conf); + if (pipeline == null) { + pipeline = interceptorInstance; + current = interceptorInstance; + continue; + } else { + current.setNextInterceptor(interceptorInstance); + current = interceptorInstance; + } + } else { + throw new YarnRuntimeException( + "Class: " + interceptorClassName + " not instance of " + + RMAdminRequestInterceptor.class.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException( + "Could not instantiate RMAdminRequestInterceptor: " + + interceptorClassName, + e); + } + } + + if (pipeline == null) { + throw new YarnRuntimeException( + "RequestInterceptor pipeline is not configured in the system"); + } + return pipeline; + } + + /** + * Initializes the request intercepter pipeline for the specified user. + * + * @param user + */ + private void initializePipeline(String user) { + RequestInterceptorChainWrapper chainWrapper = null; + synchronized (this.userPipelineMap) { + if (this.userPipelineMap.containsKey(user)) { + LOG.info("Request to start an already existing user: {}" + + " was received, so ignoring.", user); + return; + } + + chainWrapper = new RequestInterceptorChainWrapper(); + this.userPipelineMap.put(user, chainWrapper); + } + + // We register the pipeline instance in the map first and then initialize it + // later because chain initialization can be expensive and we would like to + // release the lock as soon as possible to prevent other applications from + // blocking when one application's chain is initializing + LOG.info("Initializing request processing pipeline for the user: {}", user); + + try { + RMAdminRequestInterceptor interceptorChain = + this.createRequestInterceptorChain(); + interceptorChain.init(user); + chainWrapper.init(interceptorChain); + } catch (Exception e) { + synchronized (this.userPipelineMap) { + this.userPipelineMap.remove(user); + } + throw e; + } + } + + /** + * Private structure for encapsulating RequestInterceptor and user instances. + * + */ + @Private + public static class RequestInterceptorChainWrapper { + private RMAdminRequestInterceptor rootInterceptor; + + /** + * Initializes the wrapper with the specified parameters. + * + * @param interceptor the first interceptor in the pipeline + */ + public synchronized void init(RMAdminRequestInterceptor interceptor) { + this.rootInterceptor = interceptor; + } + + /** + * Gets the root request intercepter. + * + * @return the root request intercepter + */ + public synchronized RMAdminRequestInterceptor getRootInterceptor() { + return rootInterceptor; + } + + /** + * Shutdown the chain of interceptors when the object is destroyed. + */ + @Override + protected void finalize() { + rootInterceptor.shutdown(); + } + } + + @Override + public String[] getGroupsForUser(String user) throws IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getGroupsForUser(user); + } + + @Override + public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) + throws StandbyException, YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().refreshQueues(request); + + } + + @Override + public RefreshNodesResponse refreshNodes(RefreshNodesRequest request) + throws StandbyException, YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().refreshNodes(request); + + } + + @Override + public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration( + RefreshSuperUserGroupsConfigurationRequest request) + throws StandbyException, YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor() + .refreshSuperUserGroupsConfiguration(request); + + } + + @Override + public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings( + RefreshUserToGroupsMappingsRequest request) + throws StandbyException, YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().refreshUserToGroupsMappings(request); + + } + + @Override + public RefreshAdminAclsResponse refreshAdminAcls( + RefreshAdminAclsRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().refreshAdminAcls(request); + + } + + @Override + public RefreshServiceAclsResponse refreshServiceAcls( + RefreshServiceAclsRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().refreshServiceAcls(request); + + } + + @Override + public UpdateNodeResourceResponse updateNodeResource( + UpdateNodeResourceRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().updateNodeResource(request); + + } + + @Override + public RefreshNodesResourcesResponse refreshNodesResources( + RefreshNodesResourcesRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().refreshNodesResources(request); + + } + + @Override + public AddToClusterNodeLabelsResponse addToClusterNodeLabels( + AddToClusterNodeLabelsRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().addToClusterNodeLabels(request); + + } + + @Override + public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels( + RemoveFromClusterNodeLabelsRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().removeFromClusterNodeLabels(request); + + } + + @Override + public ReplaceLabelsOnNodeResponse replaceLabelsOnNode( + ReplaceLabelsOnNodeRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().replaceLabelsOnNode(request); + + } + + @Override + public CheckForDecommissioningNodesResponse checkForDecommissioningNodes( + CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor() + .checkForDecommissioningNodes(checkForDecommissioningNodesRequest); + } + + @Override + public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority( + RefreshClusterMaxPriorityRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().refreshClusterMaxPriority(request); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/package-info.java new file mode 100644 index 00000000000..98a7ed0841c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Router RM Admin Proxy Service package. **/ +package org.apache.hadoop.yarn.server.router.rmadmin; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java index a283a624aa6..7e1508493ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java @@ -133,7 +133,7 @@ public abstract class BaseRouterClientRMTest { + "," + mockPassThroughInterceptorClass + "," + MockClientRequestInterceptor.class.getName()); - this.conf.setInt(YarnConfiguration.ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE, + this.conf.setInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE, TEST_MAX_CACHE_SIZE); this.dispatcher = new AsyncDispatcher(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/BaseRouterRMAdminTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/BaseRouterRMAdminTest.java new file mode 100644 index 00000000000..d3eba618028 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/BaseRouterRMAdminTest.java @@ -0,0 +1,346 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.rmadmin; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.HashMap; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; + +/** + * Base class for all the RouterRMAdminService test cases. It provides utility + * methods that can be used by the concrete test case classes. + * + */ +public abstract class BaseRouterRMAdminTest { + + /** + * The RouterRMAdminService instance that will be used by all the test cases. + */ + private MockRouterRMAdminService rmAdminService; + /** + * Thread pool used for asynchronous operations. + */ + private static ExecutorService threadpool = Executors.newCachedThreadPool(); + private Configuration conf; + private AsyncDispatcher dispatcher; + + public final static int TEST_MAX_CACHE_SIZE = 10; + + protected MockRouterRMAdminService getRouterRMAdminService() { + Assert.assertNotNull(this.rmAdminService); + return this.rmAdminService; + } + + @Before + public void setUp() { + this.conf = new YarnConfiguration(); + String mockPassThroughInterceptorClass = + PassThroughRMAdminRequestInterceptor.class.getName(); + + // Create a request intercepter pipeline for testing. The last one in the + // chain will call the mock resource manager. The others in the chain will + // simply forward it to the next one in the chain + this.conf.set(YarnConfiguration.ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE, + mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass + + "," + mockPassThroughInterceptorClass + "," + + MockRMAdminRequestInterceptor.class.getName()); + + this.conf.setInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE, + TEST_MAX_CACHE_SIZE); + + this.dispatcher = new AsyncDispatcher(); + this.dispatcher.init(conf); + this.dispatcher.start(); + this.rmAdminService = createAndStartRouterRMAdminService(); + } + + @After + public void tearDown() { + if (rmAdminService != null) { + rmAdminService.stop(); + rmAdminService = null; + } + if (this.dispatcher != null) { + this.dispatcher.stop(); + } + } + + protected ExecutorService getThreadPool() { + return threadpool; + } + + protected MockRouterRMAdminService createAndStartRouterRMAdminService() { + MockRouterRMAdminService svc = new MockRouterRMAdminService(); + svc.init(conf); + svc.start(); + return svc; + } + + protected static class MockRouterRMAdminService extends RouterRMAdminService { + public MockRouterRMAdminService() { + super(); + } + } + + protected RefreshQueuesResponse refreshQueues(String user) + throws IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public RefreshQueuesResponse run() throws Exception { + RefreshQueuesRequest req = RefreshQueuesRequest.newInstance(); + RefreshQueuesResponse response = + getRouterRMAdminService().refreshQueues(req); + return response; + } + }); + } + + protected RefreshNodesResponse refreshNodes(String user) + throws IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public RefreshNodesResponse run() throws Exception { + RefreshNodesRequest req = RefreshNodesRequest.newInstance(); + RefreshNodesResponse response = + getRouterRMAdminService().refreshNodes(req); + return response; + } + }); + } + + protected RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration( + String user) throws IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user).doAs( + new PrivilegedExceptionAction() { + @Override + public RefreshSuperUserGroupsConfigurationResponse run() + throws Exception { + RefreshSuperUserGroupsConfigurationRequest req = + RefreshSuperUserGroupsConfigurationRequest.newInstance(); + RefreshSuperUserGroupsConfigurationResponse response = + getRouterRMAdminService() + .refreshSuperUserGroupsConfiguration(req); + return response; + } + }); + } + + protected RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings( + String user) throws IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user).doAs( + new PrivilegedExceptionAction() { + @Override + public RefreshUserToGroupsMappingsResponse run() throws Exception { + RefreshUserToGroupsMappingsRequest req = + RefreshUserToGroupsMappingsRequest.newInstance(); + RefreshUserToGroupsMappingsResponse response = + getRouterRMAdminService().refreshUserToGroupsMappings(req); + return response; + } + }); + } + + protected RefreshAdminAclsResponse refreshAdminAcls(String user) + throws IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public RefreshAdminAclsResponse run() throws Exception { + RefreshAdminAclsRequest req = RefreshAdminAclsRequest.newInstance(); + RefreshAdminAclsResponse response = + getRouterRMAdminService().refreshAdminAcls(req); + return response; + } + }); + } + + protected RefreshServiceAclsResponse refreshServiceAcls(String user) + throws IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public RefreshServiceAclsResponse run() throws Exception { + RefreshServiceAclsRequest req = + RefreshServiceAclsRequest.newInstance(); + RefreshServiceAclsResponse response = + getRouterRMAdminService().refreshServiceAcls(req); + return response; + } + }); + } + + protected UpdateNodeResourceResponse updateNodeResource(String user) + throws IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public UpdateNodeResourceResponse run() throws Exception { + UpdateNodeResourceRequest req = + UpdateNodeResourceRequest.newInstance(null); + UpdateNodeResourceResponse response = + getRouterRMAdminService().updateNodeResource(req); + return response; + } + }); + } + + protected RefreshNodesResourcesResponse refreshNodesResources(String user) + throws IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public RefreshNodesResourcesResponse run() throws Exception { + RefreshNodesResourcesRequest req = + RefreshNodesResourcesRequest.newInstance(); + RefreshNodesResourcesResponse response = + getRouterRMAdminService().refreshNodesResources(req); + return response; + } + }); + } + + protected AddToClusterNodeLabelsResponse addToClusterNodeLabels(String user) + throws IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public AddToClusterNodeLabelsResponse run() throws Exception { + AddToClusterNodeLabelsRequest req = + AddToClusterNodeLabelsRequest.newInstance(null); + AddToClusterNodeLabelsResponse response = + getRouterRMAdminService().addToClusterNodeLabels(req); + return response; + } + }); + } + + protected RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels( + String user) throws IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user).doAs( + new PrivilegedExceptionAction() { + @Override + public RemoveFromClusterNodeLabelsResponse run() throws Exception { + RemoveFromClusterNodeLabelsRequest req = + RemoveFromClusterNodeLabelsRequest.newInstance(null); + RemoveFromClusterNodeLabelsResponse response = + getRouterRMAdminService().removeFromClusterNodeLabels(req); + return response; + } + }); + } + + protected ReplaceLabelsOnNodeResponse replaceLabelsOnNode(String user) + throws IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public ReplaceLabelsOnNodeResponse run() throws Exception { + ReplaceLabelsOnNodeRequest req = ReplaceLabelsOnNodeRequest + .newInstance(new HashMap>()); + ReplaceLabelsOnNodeResponse response = + getRouterRMAdminService().replaceLabelsOnNode(req); + return response; + } + }); + } + + protected CheckForDecommissioningNodesResponse checkForDecommissioningNodes( + String user) throws IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user).doAs( + new PrivilegedExceptionAction() { + @Override + public CheckForDecommissioningNodesResponse run() throws Exception { + CheckForDecommissioningNodesRequest req = + CheckForDecommissioningNodesRequest.newInstance(); + CheckForDecommissioningNodesResponse response = + getRouterRMAdminService().checkForDecommissioningNodes(req); + return response; + } + }); + } + + protected RefreshClusterMaxPriorityResponse refreshClusterMaxPriority( + String user) throws IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user).doAs( + new PrivilegedExceptionAction() { + @Override + public RefreshClusterMaxPriorityResponse run() throws Exception { + RefreshClusterMaxPriorityRequest req = + RefreshClusterMaxPriorityRequest.newInstance(); + RefreshClusterMaxPriorityResponse response = + getRouterRMAdminService().refreshClusterMaxPriority(req); + return response; + } + }); + } + + protected String[] getGroupsForUser(String user) + throws IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public String[] run() throws Exception { + String[] response = + getRouterRMAdminService().getGroupsForUser(user); + return response; + } + }); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/MockRMAdminRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/MockRMAdminRequestInterceptor.java new file mode 100644 index 00000000000..ab7bdb41ed2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/MockRMAdminRequestInterceptor.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.rmadmin; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.MockResourceManagerFacade; + +/** + * This class mocks the RMAmdinRequestInterceptor. + */ +public class MockRMAdminRequestInterceptor + extends DefaultRMAdminRequestInterceptor { + + public void init(String user) { + MockResourceManagerFacade mockRM = new MockResourceManagerFacade( + new YarnConfiguration(super.getConf()), 0); + super.setRMAdmin(mockRM); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java new file mode 100644 index 00000000000..38dcc3d96d3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.rmadmin; + +import java.io.IOException; + +import org.apache.hadoop.ipc.StandbyException; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; + +/** + * Mock intercepter that does not do anything other than forwarding it to the + * next intercepter in the chain. + */ +public class PassThroughRMAdminRequestInterceptor + extends AbstractRMAdminRequestInterceptor { + + @Override + public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) + throws StandbyException, YarnException, IOException { + return getNextInterceptor().refreshQueues(request); + } + + @Override + public RefreshNodesResponse refreshNodes(RefreshNodesRequest request) + throws StandbyException, YarnException, IOException { + return getNextInterceptor().refreshNodes(request); + } + + @Override + public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration( + RefreshSuperUserGroupsConfigurationRequest request) + throws StandbyException, YarnException, IOException { + return getNextInterceptor().refreshSuperUserGroupsConfiguration(request); + } + + @Override + public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings( + RefreshUserToGroupsMappingsRequest request) + throws StandbyException, YarnException, IOException { + return getNextInterceptor().refreshUserToGroupsMappings(request); + } + + @Override + public RefreshAdminAclsResponse refreshAdminAcls( + RefreshAdminAclsRequest request) throws YarnException, IOException { + return getNextInterceptor().refreshAdminAcls(request); + } + + @Override + public RefreshServiceAclsResponse refreshServiceAcls( + RefreshServiceAclsRequest request) throws YarnException, IOException { + return getNextInterceptor().refreshServiceAcls(request); + } + + @Override + public UpdateNodeResourceResponse updateNodeResource( + UpdateNodeResourceRequest request) throws YarnException, IOException { + return getNextInterceptor().updateNodeResource(request); + } + + @Override + public RefreshNodesResourcesResponse refreshNodesResources( + RefreshNodesResourcesRequest request) throws YarnException, IOException { + return getNextInterceptor().refreshNodesResources(request); + } + + @Override + public AddToClusterNodeLabelsResponse addToClusterNodeLabels( + AddToClusterNodeLabelsRequest request) throws YarnException, IOException { + return getNextInterceptor().addToClusterNodeLabels(request); + } + + @Override + public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels( + RemoveFromClusterNodeLabelsRequest request) + throws YarnException, IOException { + return getNextInterceptor().removeFromClusterNodeLabels(request); + } + + @Override + public ReplaceLabelsOnNodeResponse replaceLabelsOnNode( + ReplaceLabelsOnNodeRequest request) throws YarnException, IOException { + return getNextInterceptor().replaceLabelsOnNode(request); + } + + @Override + public CheckForDecommissioningNodesResponse checkForDecommissioningNodes( + CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest) + throws YarnException, IOException { + return getNextInterceptor() + .checkForDecommissioningNodes(checkForDecommissioningNodesRequest); + } + + @Override + public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority( + RefreshClusterMaxPriorityRequest request) + throws YarnException, IOException { + return getNextInterceptor().refreshClusterMaxPriority(request); + } + + @Override + public String[] getGroupsForUser(String user) throws IOException { + return getNextInterceptor().getGroupsForUser(user); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestRouterRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestRouterRMAdminService.java new file mode 100644 index 00000000000..11786e6f980 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestRouterRMAdminService.java @@ -0,0 +1,219 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.router.rmadmin; + +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; +import org.apache.hadoop.yarn.server.router.rmadmin.RouterRMAdminService.RequestInterceptorChainWrapper; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test class to validate the RMAdmin Service inside the Router. + */ +public class TestRouterRMAdminService extends BaseRouterRMAdminTest { + + private static final Logger LOG = + LoggerFactory.getLogger(TestRouterRMAdminService.class); + + /** + * Tests if the pipeline is created properly. + */ + @Test + public void testRequestInterceptorChainCreation() throws Exception { + RMAdminRequestInterceptor root = + super.getRouterRMAdminService().createRequestInterceptorChain(); + int index = 0; + while (root != null) { + // The current pipeline is: + // PassThroughRMAdminRequestInterceptor - index = 0 + // PassThroughRMAdminRequestInterceptor - index = 1 + // PassThroughRMAdminRequestInterceptor - index = 2 + // MockClientRequestInterceptor - index = 3 + switch (index) { + case 0: // Fall to the next case + case 1: // Fall to the next case + case 2: + // If index is equal to 0,1 or 2 we fall in this check + Assert.assertEquals( + PassThroughRMAdminRequestInterceptor.class.getName(), + root.getClass().getName()); + break; + case 3: + Assert.assertEquals(MockRMAdminRequestInterceptor.class.getName(), + root.getClass().getName()); + break; + default: + Assert.fail(); + } + root = root.getNextInterceptor(); + index++; + } + Assert.assertEquals("The number of interceptors in chain does not match", 4, + index); + } + + /** + * Test if the RouterRMAdmin forwards all the requests to the MockRM and get + * back the responses. + */ + @Test + public void testRouterRMAdminServiceE2E() throws Exception { + + String user = "test1"; + + LOG.info("testRouterRMAdminServiceE2E - Refresh Queues"); + + RefreshQueuesResponse responseRefreshQueues = refreshQueues(user); + Assert.assertNotNull(responseRefreshQueues); + + LOG.info("testRouterRMAdminServiceE2E - Refresh Nodes"); + + RefreshNodesResponse responseRefreshNodes = refreshNodes(user); + Assert.assertNotNull(responseRefreshNodes); + + LOG.info("testRouterRMAdminServiceE2E - Refresh Super User"); + + RefreshSuperUserGroupsConfigurationResponse responseRefreshSuperUser = + refreshSuperUserGroupsConfiguration(user); + Assert.assertNotNull(responseRefreshSuperUser); + + LOG.info("testRouterRMAdminServiceE2E - Refresh User to Group"); + + RefreshUserToGroupsMappingsResponse responseRefreshUserToGroup = + refreshUserToGroupsMappings(user); + Assert.assertNotNull(responseRefreshUserToGroup); + + LOG.info("testRouterRMAdminServiceE2E - Refresh Admin Acls"); + + RefreshAdminAclsResponse responseRefreshAdminAcls = refreshAdminAcls(user); + Assert.assertNotNull(responseRefreshAdminAcls); + + LOG.info("testRouterRMAdminServiceE2E - Refresh Service Acls"); + + RefreshServiceAclsResponse responseRefreshServiceAcls = + refreshServiceAcls(user); + Assert.assertNotNull(responseRefreshServiceAcls); + + LOG.info("testRouterRMAdminServiceE2E - Update Node Resource"); + + UpdateNodeResourceResponse responseUpdateNodeResource = + updateNodeResource(user); + Assert.assertNotNull(responseUpdateNodeResource); + + LOG.info("testRouterRMAdminServiceE2E - Refresh Nodes Resource"); + + RefreshNodesResourcesResponse responseRefreshNodesResources = + refreshNodesResources(user); + Assert.assertNotNull(responseRefreshNodesResources); + + LOG.info("testRouterRMAdminServiceE2E - Add To Cluster NodeLabels"); + + AddToClusterNodeLabelsResponse responseAddToClusterNodeLabels = + addToClusterNodeLabels(user); + Assert.assertNotNull(responseAddToClusterNodeLabels); + + LOG.info("testRouterRMAdminServiceE2E - Remove To Cluster NodeLabels"); + + RemoveFromClusterNodeLabelsResponse responseRemoveFromClusterNodeLabels = + removeFromClusterNodeLabels(user); + Assert.assertNotNull(responseRemoveFromClusterNodeLabels); + + LOG.info("testRouterRMAdminServiceE2E - Replace Labels On Node"); + + ReplaceLabelsOnNodeResponse responseReplaceLabelsOnNode = + replaceLabelsOnNode(user); + Assert.assertNotNull(responseReplaceLabelsOnNode); + + LOG.info("testRouterRMAdminServiceE2E - Check For Decommissioning Nodes"); + + CheckForDecommissioningNodesResponse responseCheckForDecom = + checkForDecommissioningNodes(user); + Assert.assertNotNull(responseCheckForDecom); + + LOG.info("testRouterRMAdminServiceE2E - Refresh Cluster Max Priority"); + + RefreshClusterMaxPriorityResponse responseRefreshClusterMaxPriority = + refreshClusterMaxPriority(user); + Assert.assertNotNull(responseRefreshClusterMaxPriority); + + LOG.info("testRouterRMAdminServiceE2E - Get Groups For User"); + + String[] responseGetGroupsForUser = getGroupsForUser(user); + Assert.assertNotNull(responseGetGroupsForUser); + + } + + /** + * Test if the different chains for users are generated, and LRU cache is + * working as expected. + */ + @Test + public void testUsersChainMapWithLRUCache() + throws YarnException, IOException, InterruptedException { + + Map pipelines; + RequestInterceptorChainWrapper chain; + + refreshQueues("test1"); + refreshQueues("test2"); + refreshQueues("test3"); + refreshQueues("test4"); + refreshQueues("test5"); + refreshQueues("test6"); + refreshQueues("test7"); + refreshQueues("test8"); + + pipelines = super.getRouterRMAdminService().getPipelines(); + Assert.assertEquals(8, pipelines.size()); + + refreshQueues("test9"); + refreshQueues("test10"); + refreshQueues("test1"); + refreshQueues("test11"); + + // The cache max size is defined in + // BaseRouterClientRMTest.TEST_MAX_CACHE_SIZE + Assert.assertEquals(10, pipelines.size()); + + chain = pipelines.get("test1"); + Assert.assertNotNull("test1 should not be evicted", chain); + + chain = pipelines.get("test2"); + Assert.assertNull("test2 should have been evicted", chain); + } + +}