diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index cf9c2379972..1432867ab6e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2639,6 +2639,8 @@ public class YarnConfiguration extends Configuration {
public static final String ROUTER_PREFIX = YARN_PREFIX + "router.";
+ public static final String ROUTER_BIND_HOST = ROUTER_PREFIX + "bind-host";
+
public static final String ROUTER_CLIENTRM_PREFIX =
ROUTER_PREFIX + "clientrm.";
@@ -2654,9 +2656,23 @@ public class YarnConfiguration extends Configuration {
"org.apache.hadoop.yarn.server.router.clientrm."
+ "DefaultClientRequestInterceptor";
- public static final String ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE =
- ROUTER_CLIENTRM_PREFIX + "cache-max-size";
- public static final int DEFAULT_ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE = 25;
+ public static final String ROUTER_PIPELINE_CACHE_MAX_SIZE =
+ ROUTER_PREFIX + "pipeline.cache-max-size";
+ public static final int DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE = 25;
+
+ public static final String ROUTER_RMADMIN_PREFIX = ROUTER_PREFIX + "rmadmin.";
+
+ public static final String ROUTER_RMADMIN_ADDRESS =
+ ROUTER_RMADMIN_PREFIX + ".address";
+ public static final int DEFAULT_ROUTER_RMADMIN_PORT = 8052;
+ public static final String DEFAULT_ROUTER_RMADMIN_ADDRESS =
+ "0.0.0.0:" + DEFAULT_ROUTER_RMADMIN_PORT;
+
+ public static final String ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE =
+ ROUTER_RMADMIN_PREFIX + "interceptor-class.pipeline";
+ public static final String DEFAULT_ROUTER_RMADMIN_INTERCEPTOR_CLASS =
+ "org.apache.hadoop.yarn.server.router.rmadmin."
+ + "DefaultRMAdminRequestInterceptor";
////////////////////////////////
// Other Configs
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 94dccd1fc58..82193257e59 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3177,12 +3177,33 @@
- Size of LRU cache for Router ClientRM Service.
+ Size of LRU cache for Router ClientRM Service and RMAdmin Service.
- yarn.router.clientrm.cache-max-size
+ yarn.router.pipeline.cache-max-size
25
+
+
+ The comma separated list of class names that implement the
+ RequestInterceptor interface. This is used by the RouterRMAdminService
+ to create the request processing pipeline for users.
+
+ yarn.router.rmadmin.interceptor-class.pipeline
+ org.apache.hadoop.yarn.server.router.rmadmin.DefaultRMAdminRequestInterceptor
+
+
+
+
+ The actual address the server will bind to. If this optional address is
+ set, the RPC and webapp servers will bind to this address and the port specified in
+ yarn.router.address and yarn.router.webapp.address, respectively. This is
+ most useful for making Router listen to all interfaces by setting to 0.0.0.0.
+
+ yarn.router.bind-host
+
+
+
Comma-separated list of PlacementRules to determine how applications
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLRUCacheHashMap.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLRUCacheHashMap.java
index 1cbb56c7326..9d3ec32975a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLRUCacheHashMap.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLRUCacheHashMap.java
@@ -24,7 +24,7 @@ import org.junit.Assert;
import org.junit.Test;
/**
- * Test class to validate the correctness of the LRUCacheHashMap.
+ * Test class to validate the correctness of the {@code LRUCacheHashMap}.
*
*/
public class TestLRUCacheHashMap {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
index e302c70be33..696188be7ce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
@@ -26,6 +26,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
@@ -117,6 +118,33 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.slf4j.Logger;
@@ -130,8 +158,8 @@ import com.google.common.base.Strings;
* implementation is expected by the Router/AMRMProxy unit test cases. So please
* change the implementation with care.
*/
-public class MockResourceManagerFacade
- implements ApplicationClientProtocol, ApplicationMasterProtocol {
+public class MockResourceManagerFacade implements ApplicationClientProtocol,
+ ApplicationMasterProtocol, ResourceManagerAdministrationProtocol {
private static final Logger LOG =
LoggerFactory.getLogger(MockResourceManagerFacade.class);
@@ -508,4 +536,92 @@ public class MockResourceManagerFacade
throws YarnException, IOException {
return UpdateApplicationTimeoutsResponse.newInstance();
}
+
+ @Override
+ public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
+ throws StandbyException, YarnException, IOException {
+ return RefreshQueuesResponse.newInstance();
+ }
+
+ @Override
+ public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
+ throws StandbyException, YarnException, IOException {
+ return RefreshNodesResponse.newInstance();
+ }
+
+ @Override
+ public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
+ RefreshSuperUserGroupsConfigurationRequest request)
+ throws StandbyException, YarnException, IOException {
+ return RefreshSuperUserGroupsConfigurationResponse.newInstance();
+ }
+
+ @Override
+ public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
+ RefreshUserToGroupsMappingsRequest request)
+ throws StandbyException, YarnException, IOException {
+ return RefreshUserToGroupsMappingsResponse.newInstance();
+ }
+
+ @Override
+ public RefreshAdminAclsResponse refreshAdminAcls(
+ RefreshAdminAclsRequest request) throws YarnException, IOException {
+ return RefreshAdminAclsResponse.newInstance();
+ }
+
+ @Override
+ public RefreshServiceAclsResponse refreshServiceAcls(
+ RefreshServiceAclsRequest request) throws YarnException, IOException {
+ return RefreshServiceAclsResponse.newInstance();
+ }
+
+ @Override
+ public UpdateNodeResourceResponse updateNodeResource(
+ UpdateNodeResourceRequest request) throws YarnException, IOException {
+ return UpdateNodeResourceResponse.newInstance();
+ }
+
+ @Override
+ public RefreshNodesResourcesResponse refreshNodesResources(
+ RefreshNodesResourcesRequest request) throws YarnException, IOException {
+ return RefreshNodesResourcesResponse.newInstance();
+ }
+
+ @Override
+ public AddToClusterNodeLabelsResponse addToClusterNodeLabels(
+ AddToClusterNodeLabelsRequest request) throws YarnException, IOException {
+ return AddToClusterNodeLabelsResponse.newInstance();
+ }
+
+ @Override
+ public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
+ RemoveFromClusterNodeLabelsRequest request)
+ throws YarnException, IOException {
+ return RemoveFromClusterNodeLabelsResponse.newInstance();
+ }
+
+ @Override
+ public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
+ ReplaceLabelsOnNodeRequest request) throws YarnException, IOException {
+ return ReplaceLabelsOnNodeResponse.newInstance();
+ }
+
+ @Override
+ public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
+ CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)
+ throws YarnException, IOException {
+ return CheckForDecommissioningNodesResponse.newInstance(null);
+ }
+
+ @Override
+ public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
+ RefreshClusterMaxPriorityRequest request)
+ throws YarnException, IOException {
+ return RefreshClusterMaxPriorityResponse.newInstance();
+ }
+
+ @Override
+ public String[] getGroupsForUser(String user) throws IOException {
+ return new String[0];
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
index 7cfabf5af76..d2eee5a45fe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
+import org.apache.hadoop.yarn.server.router.rmadmin.RouterRMAdminService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,6 +55,7 @@ public class Router extends CompositeService {
private Configuration conf;
private AtomicBoolean isStopping = new AtomicBoolean(false);
private RouterClientRMService clientRMProxyService;
+ private RouterRMAdminService rmAdminProxyService;
/**
* Priority of the Router shutdown hook.
@@ -71,8 +73,12 @@ public class Router extends CompositeService {
@Override
protected void serviceInit(Configuration config) throws Exception {
this.conf = config;
+ // ClientRM Proxy
clientRMProxyService = createClientRMProxyService();
addService(clientRMProxyService);
+ // RMAdmin Proxy
+ rmAdminProxyService = createRMAdminProxyService();
+ addService(rmAdminProxyService);
super.serviceInit(conf);
}
@@ -107,6 +113,10 @@ public class Router extends CompositeService {
return new RouterClientRMService();
}
+ protected RouterRMAdminService createRMAdminProxyService() {
+ return new RouterRMAdminService();
+ }
+
public static void main(String[] argv) {
Configuration conf = new YarnConfiguration();
Thread
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java
index fc6a118da24..5980b03a66b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java
@@ -21,8 +21,9 @@ package org.apache.hadoop.yarn.server.router.clientrm;
import org.apache.hadoop.conf.Configuration;
/**
- * Implements the RequestInterceptor interface and provides common functionality
- * which can can be used and/or extended by other concrete intercepter classes.
+ * Implements the {@link ClientRequestInterceptor} interface and provides common
+ * functionality which can can be used and/or extended by other concrete
+ * intercepter classes.
*
*/
public abstract class AbstractClientRequestInterceptor
@@ -31,7 +32,7 @@ public abstract class AbstractClientRequestInterceptor
private ClientRequestInterceptor nextInterceptor;
/**
- * Sets the {@code RequestInterceptor} in the chain.
+ * Sets the {@link ClientRequestInterceptor} in the chain.
*/
@Override
public void setNextInterceptor(ClientRequestInterceptor nextInterceptor) {
@@ -59,7 +60,7 @@ public abstract class AbstractClientRequestInterceptor
}
/**
- * Initializes the {@code ClientRequestInterceptor}.
+ * Initializes the {@link ClientRequestInterceptor}.
*/
@Override
public void init(String user) {
@@ -69,7 +70,7 @@ public abstract class AbstractClientRequestInterceptor
}
/**
- * Disposes the {@code ClientRequestInterceptor}.
+ * Disposes the {@link ClientRequestInterceptor}.
*/
@Override
public void shutdown() {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java
index 12b933b8488..9e2bfed9543 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java
@@ -91,7 +91,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
- * Extends the AbstractRequestInterceptorClient class and provides an
+ * Extends the {@code AbstractRequestInterceptorClient} class and provides an
* implementation that simply forwards the client requests to the cluster
* resource manager.
*
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java
index 00016dd7aed..fd2c610c7fe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java
@@ -104,10 +104,11 @@ import com.google.common.annotations.VisibleForTesting;
/**
* RouterClientRMService is a service that runs on each router that can be used
- * to intercept and inspect ApplicationClientProtocol messages from client to
- * the cluster resource manager. It listens ApplicationClientProtocol messages
- * from the client and creates a request intercepting pipeline instance for each
- * client. The pipeline is a chain of intercepter instances that can inspect and
+ * to intercept and inspect {@link ApplicationClientProtocol} messages from
+ * client to the cluster resource manager. It listens
+ * {@link ApplicationClientProtocol} messages from the client and creates a
+ * request intercepting pipeline instance for each client. The pipeline is a
+ * chain of {@link ClientRequestInterceptor} instances that can inspect and
* modify the request/response as needed. The main difference with
* AMRMProxyService is the protocol they implement.
*/
@@ -137,13 +138,14 @@ public class RouterClientRMService extends AbstractService
UserGroupInformation.setConfiguration(conf);
this.listenerEndpoint =
- conf.getSocketAddr(YarnConfiguration.ROUTER_CLIENTRM_ADDRESS,
+ conf.getSocketAddr(YarnConfiguration.ROUTER_BIND_HOST,
+ YarnConfiguration.ROUTER_CLIENTRM_ADDRESS,
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_ADDRESS,
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PORT);
int maxCacheSize =
- conf.getInt(YarnConfiguration.ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE,
- YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE);
+ conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
+ YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE);
this.userPipelineMap = Collections.synchronizedMap(
new LRUCacheHashMap(
maxCacheSize, true));
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/AbstractRMAdminRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/AbstractRMAdminRequestInterceptor.java
new file mode 100644
index 00000000000..a4972fcb9ad
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/AbstractRMAdminRequestInterceptor.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.rmadmin;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Implements the {@link RMAdminRequestInterceptor} interface and provides
+ * common functionality which can can be used and/or extended by other concrete
+ * intercepter classes.
+ *
+ */
+public abstract class AbstractRMAdminRequestInterceptor
+ implements RMAdminRequestInterceptor {
+ private Configuration conf;
+ private RMAdminRequestInterceptor nextInterceptor;
+
+ /**
+ * Sets the {@link RMAdminRequestInterceptor} in the chain.
+ */
+ @Override
+ public void setNextInterceptor(RMAdminRequestInterceptor nextInterceptor) {
+ this.nextInterceptor = nextInterceptor;
+ }
+
+ /**
+ * Sets the {@link Configuration}.
+ */
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ if (this.nextInterceptor != null) {
+ this.nextInterceptor.setConf(conf);
+ }
+ }
+
+ /**
+ * Gets the {@link Configuration}.
+ */
+ @Override
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ /**
+ * Initializes the {@link RMAdminRequestInterceptor}.
+ */
+ @Override
+ public void init(String user) {
+ if (this.nextInterceptor != null) {
+ this.nextInterceptor.init(user);
+ }
+ }
+
+ /**
+ * Disposes the {@link RMAdminRequestInterceptor}.
+ */
+ @Override
+ public void shutdown() {
+ if (this.nextInterceptor != null) {
+ this.nextInterceptor.shutdown();
+ }
+ }
+
+ /**
+ * Gets the next {@link RMAdminRequestInterceptor} in the chain.
+ */
+ @Override
+ public RMAdminRequestInterceptor getNextInterceptor() {
+ return this.nextInterceptor;
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java
new file mode 100644
index 00000000000..7e6a1ff28d3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.rmadmin;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.StandbyException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Extends the {@link AbstractRMAdminRequestInterceptor} class and provides an
+ * implementation that simply forwards the client requests to the cluster
+ * resource manager.
+ *
+ */
+public class DefaultRMAdminRequestInterceptor
+ extends AbstractRMAdminRequestInterceptor {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DefaultRMAdminRequestInterceptor.class);
+ private ResourceManagerAdministrationProtocol rmAdminProxy;
+ private UserGroupInformation user = null;
+
+ @Override
+ public void init(String userName) {
+ super.init(userName);
+ try {
+ // Do not create a proxy user if user name matches the user name on
+ // current UGI
+ if (userName.equalsIgnoreCase(
+ UserGroupInformation.getCurrentUser().getUserName())) {
+ user = UserGroupInformation.getCurrentUser();
+ } else {
+ user = UserGroupInformation.createProxyUser(userName,
+ UserGroupInformation.getCurrentUser());
+ }
+
+ final Configuration conf = this.getConf();
+
+ rmAdminProxy = user.doAs(
+ new PrivilegedExceptionAction() {
+ @Override
+ public ResourceManagerAdministrationProtocol run()
+ throws Exception {
+ return ClientRMProxy.createRMProxy(conf,
+ ResourceManagerAdministrationProtocol.class);
+ }
+ });
+ } catch (IOException e) {
+ String message = "Error while creating Router RMAdmin Service for user:";
+ if (user != null) {
+ message += ", user: " + user;
+ }
+
+ LOG.info(message);
+ throw new YarnRuntimeException(message, e);
+ } catch (Exception e) {
+ throw new YarnRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void setNextInterceptor(RMAdminRequestInterceptor next) {
+ throw new YarnRuntimeException("setNextInterceptor is being called on "
+ + "DefaultRMAdminRequestInterceptor, which should be the last one "
+ + "in the chain. Check if the interceptor pipeline configuration "
+ + "is correct");
+ }
+
+ @VisibleForTesting
+ public void setRMAdmin(ResourceManagerAdministrationProtocol rmAdmin) {
+ this.rmAdminProxy = rmAdmin;
+
+ }
+
+ @Override
+ public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
+ throws StandbyException, YarnException, IOException {
+ return rmAdminProxy.refreshQueues(request);
+ }
+
+ @Override
+ public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
+ throws StandbyException, YarnException, IOException {
+ return rmAdminProxy.refreshNodes(request);
+ }
+
+ @Override
+ public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
+ RefreshSuperUserGroupsConfigurationRequest request)
+ throws StandbyException, YarnException, IOException {
+ return rmAdminProxy.refreshSuperUserGroupsConfiguration(request);
+ }
+
+ @Override
+ public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
+ RefreshUserToGroupsMappingsRequest request)
+ throws StandbyException, YarnException, IOException {
+ return rmAdminProxy.refreshUserToGroupsMappings(request);
+ }
+
+ @Override
+ public RefreshAdminAclsResponse refreshAdminAcls(
+ RefreshAdminAclsRequest request) throws YarnException, IOException {
+ return rmAdminProxy.refreshAdminAcls(request);
+ }
+
+ @Override
+ public RefreshServiceAclsResponse refreshServiceAcls(
+ RefreshServiceAclsRequest request) throws YarnException, IOException {
+ return rmAdminProxy.refreshServiceAcls(request);
+ }
+
+ @Override
+ public UpdateNodeResourceResponse updateNodeResource(
+ UpdateNodeResourceRequest request) throws YarnException, IOException {
+ return rmAdminProxy.updateNodeResource(request);
+ }
+
+ @Override
+ public RefreshNodesResourcesResponse refreshNodesResources(
+ RefreshNodesResourcesRequest request) throws YarnException, IOException {
+ return rmAdminProxy.refreshNodesResources(request);
+ }
+
+ @Override
+ public AddToClusterNodeLabelsResponse addToClusterNodeLabels(
+ AddToClusterNodeLabelsRequest request) throws YarnException, IOException {
+ return rmAdminProxy.addToClusterNodeLabels(request);
+ }
+
+ @Override
+ public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
+ RemoveFromClusterNodeLabelsRequest request)
+ throws YarnException, IOException {
+ return rmAdminProxy.removeFromClusterNodeLabels(request);
+ }
+
+ @Override
+ public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
+ ReplaceLabelsOnNodeRequest request) throws YarnException, IOException {
+ return rmAdminProxy.replaceLabelsOnNode(request);
+ }
+
+ @Override
+ public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
+ CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)
+ throws YarnException, IOException {
+ return rmAdminProxy
+ .checkForDecommissioningNodes(checkForDecommissioningNodesRequest);
+ }
+
+ @Override
+ public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
+ RefreshClusterMaxPriorityRequest request)
+ throws YarnException, IOException {
+ return rmAdminProxy.refreshClusterMaxPriority(request);
+ }
+
+ @Override
+ public String[] getGroupsForUser(String userName) throws IOException {
+ return rmAdminProxy.getGroupsForUser(userName);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RMAdminRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RMAdminRequestInterceptor.java
new file mode 100644
index 00000000000..dc4bda01b90
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RMAdminRequestInterceptor.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.rmadmin;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
+
+/**
+ * Defines the contract to be implemented by the request intercepter classes,
+ * that can be used to intercept and inspect messages sent from the client to
+ * the resource manager.
+ */
+public interface RMAdminRequestInterceptor
+ extends ResourceManagerAdministrationProtocol, Configurable {
+ /**
+ * This method is called for initializing the intercepter. This is guaranteed
+ * to be called only once in the lifetime of this instance.
+ *
+ * @param user the name of the client
+ */
+ void init(String user);
+
+ /**
+ * This method is called to release the resources held by the intercepter.
+ * This will be called when the application pipeline is being destroyed. The
+ * concrete implementations should dispose the resources and forward the
+ * request to the next intercepter, if any.
+ */
+ void shutdown();
+
+ /**
+ * Sets the next intercepter in the pipeline. The concrete implementation of
+ * this interface should always pass the request to the nextInterceptor after
+ * inspecting the message. The last intercepter in the chain is responsible to
+ * send the messages to the resource manager service and so the last
+ * intercepter will not receive this method call.
+ *
+ * @param nextInterceptor the RMAdminRequestInterceptor to set in the pipeline
+ */
+ void setNextInterceptor(RMAdminRequestInterceptor nextInterceptor);
+
+ /**
+ * Returns the next intercepter in the chain.
+ *
+ * @return the next intercepter in the chain
+ */
+ RMAdminRequestInterceptor getNextInterceptor();
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java
new file mode 100644
index 00000000000..b8b7ad818f3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java
@@ -0,0 +1,423 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.rmadmin;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.ipc.StandbyException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
+import org.apache.hadoop.yarn.util.LRUCacheHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * RouterRMAdminService is a service that runs on each router that can be used
+ * to intercept and inspect {@code ResourceManagerAdministrationProtocol}
+ * messages from client to the cluster resource manager. It listens
+ * {@code ResourceManagerAdministrationProtocol} messages from the client and
+ * creates a request intercepting pipeline instance for each client. The
+ * pipeline is a chain of intercepter instances that can inspect and modify the
+ * request/response as needed. The main difference with AMRMProxyService is the
+ * protocol they implement.
+ */
+public class RouterRMAdminService extends AbstractService
+ implements ResourceManagerAdministrationProtocol {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RouterRMAdminService.class);
+
+ private Server server;
+ private InetSocketAddress listenerEndpoint;
+
+ // For each user we store an interceptors' pipeline.
+ // For performance issue we use LRU cache to keep in memory the newest ones
+ // and remove the oldest used ones.
+ private Map userPipelineMap;
+
+ public RouterRMAdminService() {
+ super(RouterRMAdminService.class.getName());
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ LOG.info("Starting Router RMAdmin Service");
+ Configuration conf = getConfig();
+ YarnRPC rpc = YarnRPC.create(conf);
+ UserGroupInformation.setConfiguration(conf);
+
+ this.listenerEndpoint =
+ conf.getSocketAddr(YarnConfiguration.ROUTER_BIND_HOST,
+ YarnConfiguration.ROUTER_RMADMIN_ADDRESS,
+ YarnConfiguration.DEFAULT_ROUTER_RMADMIN_ADDRESS,
+ YarnConfiguration.DEFAULT_ROUTER_RMADMIN_PORT);
+
+ int maxCacheSize =
+ conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
+ YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE);
+ this.userPipelineMap = Collections.synchronizedMap(
+ new LRUCacheHashMap(
+ maxCacheSize, true));
+
+ Configuration serverConf = new Configuration(conf);
+
+ int numWorkerThreads =
+ serverConf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT,
+ YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT);
+
+ this.server = rpc.getServer(ResourceManagerAdministrationProtocol.class,
+ this, listenerEndpoint, serverConf, null, numWorkerThreads);
+
+ this.server.start();
+ LOG.info("Router RMAdminService listening on address: "
+ + this.server.getListenerAddress());
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ LOG.info("Stopping Router RMAdminService");
+ if (this.server != null) {
+ this.server.stop();
+ }
+ userPipelineMap.clear();
+ super.serviceStop();
+ }
+
+ /**
+ * Returns the comma separated intercepter class names from the configuration.
+ *
+ * @param conf
+ * @return the intercepter class names as an instance of ArrayList
+ */
+ private List getInterceptorClassNames(Configuration conf) {
+ String configuredInterceptorClassNames =
+ conf.get(YarnConfiguration.ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE,
+ YarnConfiguration.DEFAULT_ROUTER_RMADMIN_INTERCEPTOR_CLASS);
+
+ List interceptorClassNames = new ArrayList();
+ Collection tempList =
+ StringUtils.getStringCollection(configuredInterceptorClassNames);
+ for (String item : tempList) {
+ interceptorClassNames.add(item.trim());
+ }
+
+ return interceptorClassNames;
+ }
+
+ private RequestInterceptorChainWrapper getInterceptorChain()
+ throws IOException {
+ String user = UserGroupInformation.getCurrentUser().getUserName();
+ if (!userPipelineMap.containsKey(user)) {
+ initializePipeline(user);
+ }
+ return userPipelineMap.get(user);
+ }
+
+ /**
+ * Gets the Request intercepter chains for all the users.
+ *
+ * @return the request intercepter chains.
+ */
+ @VisibleForTesting
+ protected Map getPipelines() {
+ return this.userPipelineMap;
+ }
+
+ /**
+ * This method creates and returns reference of the first intercepter in the
+ * chain of request intercepter instances.
+ *
+ * @return the reference of the first intercepter in the chain
+ */
+ @VisibleForTesting
+ protected RMAdminRequestInterceptor createRequestInterceptorChain() {
+ Configuration conf = getConfig();
+
+ List interceptorClassNames = getInterceptorClassNames(conf);
+
+ RMAdminRequestInterceptor pipeline = null;
+ RMAdminRequestInterceptor current = null;
+ for (String interceptorClassName : interceptorClassNames) {
+ try {
+ Class> interceptorClass = conf.getClassByName(interceptorClassName);
+ if (RMAdminRequestInterceptor.class
+ .isAssignableFrom(interceptorClass)) {
+ RMAdminRequestInterceptor interceptorInstance =
+ (RMAdminRequestInterceptor) ReflectionUtils
+ .newInstance(interceptorClass, conf);
+ if (pipeline == null) {
+ pipeline = interceptorInstance;
+ current = interceptorInstance;
+ continue;
+ } else {
+ current.setNextInterceptor(interceptorInstance);
+ current = interceptorInstance;
+ }
+ } else {
+ throw new YarnRuntimeException(
+ "Class: " + interceptorClassName + " not instance of "
+ + RMAdminRequestInterceptor.class.getCanonicalName());
+ }
+ } catch (ClassNotFoundException e) {
+ throw new YarnRuntimeException(
+ "Could not instantiate RMAdminRequestInterceptor: "
+ + interceptorClassName,
+ e);
+ }
+ }
+
+ if (pipeline == null) {
+ throw new YarnRuntimeException(
+ "RequestInterceptor pipeline is not configured in the system");
+ }
+ return pipeline;
+ }
+
+ /**
+ * Initializes the request intercepter pipeline for the specified user.
+ *
+ * @param user
+ */
+ private void initializePipeline(String user) {
+ RequestInterceptorChainWrapper chainWrapper = null;
+ synchronized (this.userPipelineMap) {
+ if (this.userPipelineMap.containsKey(user)) {
+ LOG.info("Request to start an already existing user: {}"
+ + " was received, so ignoring.", user);
+ return;
+ }
+
+ chainWrapper = new RequestInterceptorChainWrapper();
+ this.userPipelineMap.put(user, chainWrapper);
+ }
+
+ // We register the pipeline instance in the map first and then initialize it
+ // later because chain initialization can be expensive and we would like to
+ // release the lock as soon as possible to prevent other applications from
+ // blocking when one application's chain is initializing
+ LOG.info("Initializing request processing pipeline for the user: {}", user);
+
+ try {
+ RMAdminRequestInterceptor interceptorChain =
+ this.createRequestInterceptorChain();
+ interceptorChain.init(user);
+ chainWrapper.init(interceptorChain);
+ } catch (Exception e) {
+ synchronized (this.userPipelineMap) {
+ this.userPipelineMap.remove(user);
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * Private structure for encapsulating RequestInterceptor and user instances.
+ *
+ */
+ @Private
+ public static class RequestInterceptorChainWrapper {
+ private RMAdminRequestInterceptor rootInterceptor;
+
+ /**
+ * Initializes the wrapper with the specified parameters.
+ *
+ * @param interceptor the first interceptor in the pipeline
+ */
+ public synchronized void init(RMAdminRequestInterceptor interceptor) {
+ this.rootInterceptor = interceptor;
+ }
+
+ /**
+ * Gets the root request intercepter.
+ *
+ * @return the root request intercepter
+ */
+ public synchronized RMAdminRequestInterceptor getRootInterceptor() {
+ return rootInterceptor;
+ }
+
+ /**
+ * Shutdown the chain of interceptors when the object is destroyed.
+ */
+ @Override
+ protected void finalize() {
+ rootInterceptor.shutdown();
+ }
+ }
+
+ @Override
+ public String[] getGroupsForUser(String user) throws IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().getGroupsForUser(user);
+ }
+
+ @Override
+ public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
+ throws StandbyException, YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().refreshQueues(request);
+
+ }
+
+ @Override
+ public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
+ throws StandbyException, YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().refreshNodes(request);
+
+ }
+
+ @Override
+ public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
+ RefreshSuperUserGroupsConfigurationRequest request)
+ throws StandbyException, YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor()
+ .refreshSuperUserGroupsConfiguration(request);
+
+ }
+
+ @Override
+ public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
+ RefreshUserToGroupsMappingsRequest request)
+ throws StandbyException, YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().refreshUserToGroupsMappings(request);
+
+ }
+
+ @Override
+ public RefreshAdminAclsResponse refreshAdminAcls(
+ RefreshAdminAclsRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().refreshAdminAcls(request);
+
+ }
+
+ @Override
+ public RefreshServiceAclsResponse refreshServiceAcls(
+ RefreshServiceAclsRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().refreshServiceAcls(request);
+
+ }
+
+ @Override
+ public UpdateNodeResourceResponse updateNodeResource(
+ UpdateNodeResourceRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().updateNodeResource(request);
+
+ }
+
+ @Override
+ public RefreshNodesResourcesResponse refreshNodesResources(
+ RefreshNodesResourcesRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().refreshNodesResources(request);
+
+ }
+
+ @Override
+ public AddToClusterNodeLabelsResponse addToClusterNodeLabels(
+ AddToClusterNodeLabelsRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().addToClusterNodeLabels(request);
+
+ }
+
+ @Override
+ public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
+ RemoveFromClusterNodeLabelsRequest request)
+ throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().removeFromClusterNodeLabels(request);
+
+ }
+
+ @Override
+ public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
+ ReplaceLabelsOnNodeRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().replaceLabelsOnNode(request);
+
+ }
+
+ @Override
+ public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
+ CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)
+ throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor()
+ .checkForDecommissioningNodes(checkForDecommissioningNodesRequest);
+ }
+
+ @Override
+ public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
+ RefreshClusterMaxPriorityRequest request)
+ throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().refreshClusterMaxPriority(request);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/package-info.java
new file mode 100644
index 00000000000..98a7ed0841c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Router RM Admin Proxy Service package. **/
+package org.apache.hadoop.yarn.server.router.rmadmin;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java
index a283a624aa6..7e1508493ca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java
@@ -133,7 +133,7 @@ public abstract class BaseRouterClientRMTest {
+ "," + mockPassThroughInterceptorClass + ","
+ MockClientRequestInterceptor.class.getName());
- this.conf.setInt(YarnConfiguration.ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE,
+ this.conf.setInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
TEST_MAX_CACHE_SIZE);
this.dispatcher = new AsyncDispatcher();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/BaseRouterRMAdminTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/BaseRouterRMAdminTest.java
new file mode 100644
index 00000000000..d3eba618028
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/BaseRouterRMAdminTest.java
@@ -0,0 +1,346 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.rmadmin;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+/**
+ * Base class for all the RouterRMAdminService test cases. It provides utility
+ * methods that can be used by the concrete test case classes.
+ *
+ */
+public abstract class BaseRouterRMAdminTest {
+
+ /**
+ * The RouterRMAdminService instance that will be used by all the test cases.
+ */
+ private MockRouterRMAdminService rmAdminService;
+ /**
+ * Thread pool used for asynchronous operations.
+ */
+ private static ExecutorService threadpool = Executors.newCachedThreadPool();
+ private Configuration conf;
+ private AsyncDispatcher dispatcher;
+
+ public final static int TEST_MAX_CACHE_SIZE = 10;
+
+ protected MockRouterRMAdminService getRouterRMAdminService() {
+ Assert.assertNotNull(this.rmAdminService);
+ return this.rmAdminService;
+ }
+
+ @Before
+ public void setUp() {
+ this.conf = new YarnConfiguration();
+ String mockPassThroughInterceptorClass =
+ PassThroughRMAdminRequestInterceptor.class.getName();
+
+ // Create a request intercepter pipeline for testing. The last one in the
+ // chain will call the mock resource manager. The others in the chain will
+ // simply forward it to the next one in the chain
+ this.conf.set(YarnConfiguration.ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE,
+ mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
+ + "," + mockPassThroughInterceptorClass + ","
+ + MockRMAdminRequestInterceptor.class.getName());
+
+ this.conf.setInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
+ TEST_MAX_CACHE_SIZE);
+
+ this.dispatcher = new AsyncDispatcher();
+ this.dispatcher.init(conf);
+ this.dispatcher.start();
+ this.rmAdminService = createAndStartRouterRMAdminService();
+ }
+
+ @After
+ public void tearDown() {
+ if (rmAdminService != null) {
+ rmAdminService.stop();
+ rmAdminService = null;
+ }
+ if (this.dispatcher != null) {
+ this.dispatcher.stop();
+ }
+ }
+
+ protected ExecutorService getThreadPool() {
+ return threadpool;
+ }
+
+ protected MockRouterRMAdminService createAndStartRouterRMAdminService() {
+ MockRouterRMAdminService svc = new MockRouterRMAdminService();
+ svc.init(conf);
+ svc.start();
+ return svc;
+ }
+
+ protected static class MockRouterRMAdminService extends RouterRMAdminService {
+ public MockRouterRMAdminService() {
+ super();
+ }
+ }
+
+ protected RefreshQueuesResponse refreshQueues(String user)
+ throws IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction() {
+ @Override
+ public RefreshQueuesResponse run() throws Exception {
+ RefreshQueuesRequest req = RefreshQueuesRequest.newInstance();
+ RefreshQueuesResponse response =
+ getRouterRMAdminService().refreshQueues(req);
+ return response;
+ }
+ });
+ }
+
+ protected RefreshNodesResponse refreshNodes(String user)
+ throws IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction() {
+ @Override
+ public RefreshNodesResponse run() throws Exception {
+ RefreshNodesRequest req = RefreshNodesRequest.newInstance();
+ RefreshNodesResponse response =
+ getRouterRMAdminService().refreshNodes(req);
+ return response;
+ }
+ });
+ }
+
+ protected RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
+ String user) throws IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user).doAs(
+ new PrivilegedExceptionAction() {
+ @Override
+ public RefreshSuperUserGroupsConfigurationResponse run()
+ throws Exception {
+ RefreshSuperUserGroupsConfigurationRequest req =
+ RefreshSuperUserGroupsConfigurationRequest.newInstance();
+ RefreshSuperUserGroupsConfigurationResponse response =
+ getRouterRMAdminService()
+ .refreshSuperUserGroupsConfiguration(req);
+ return response;
+ }
+ });
+ }
+
+ protected RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
+ String user) throws IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user).doAs(
+ new PrivilegedExceptionAction() {
+ @Override
+ public RefreshUserToGroupsMappingsResponse run() throws Exception {
+ RefreshUserToGroupsMappingsRequest req =
+ RefreshUserToGroupsMappingsRequest.newInstance();
+ RefreshUserToGroupsMappingsResponse response =
+ getRouterRMAdminService().refreshUserToGroupsMappings(req);
+ return response;
+ }
+ });
+ }
+
+ protected RefreshAdminAclsResponse refreshAdminAcls(String user)
+ throws IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction() {
+ @Override
+ public RefreshAdminAclsResponse run() throws Exception {
+ RefreshAdminAclsRequest req = RefreshAdminAclsRequest.newInstance();
+ RefreshAdminAclsResponse response =
+ getRouterRMAdminService().refreshAdminAcls(req);
+ return response;
+ }
+ });
+ }
+
+ protected RefreshServiceAclsResponse refreshServiceAcls(String user)
+ throws IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction() {
+ @Override
+ public RefreshServiceAclsResponse run() throws Exception {
+ RefreshServiceAclsRequest req =
+ RefreshServiceAclsRequest.newInstance();
+ RefreshServiceAclsResponse response =
+ getRouterRMAdminService().refreshServiceAcls(req);
+ return response;
+ }
+ });
+ }
+
+ protected UpdateNodeResourceResponse updateNodeResource(String user)
+ throws IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction() {
+ @Override
+ public UpdateNodeResourceResponse run() throws Exception {
+ UpdateNodeResourceRequest req =
+ UpdateNodeResourceRequest.newInstance(null);
+ UpdateNodeResourceResponse response =
+ getRouterRMAdminService().updateNodeResource(req);
+ return response;
+ }
+ });
+ }
+
+ protected RefreshNodesResourcesResponse refreshNodesResources(String user)
+ throws IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction() {
+ @Override
+ public RefreshNodesResourcesResponse run() throws Exception {
+ RefreshNodesResourcesRequest req =
+ RefreshNodesResourcesRequest.newInstance();
+ RefreshNodesResourcesResponse response =
+ getRouterRMAdminService().refreshNodesResources(req);
+ return response;
+ }
+ });
+ }
+
+ protected AddToClusterNodeLabelsResponse addToClusterNodeLabels(String user)
+ throws IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction() {
+ @Override
+ public AddToClusterNodeLabelsResponse run() throws Exception {
+ AddToClusterNodeLabelsRequest req =
+ AddToClusterNodeLabelsRequest.newInstance(null);
+ AddToClusterNodeLabelsResponse response =
+ getRouterRMAdminService().addToClusterNodeLabels(req);
+ return response;
+ }
+ });
+ }
+
+ protected RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
+ String user) throws IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user).doAs(
+ new PrivilegedExceptionAction() {
+ @Override
+ public RemoveFromClusterNodeLabelsResponse run() throws Exception {
+ RemoveFromClusterNodeLabelsRequest req =
+ RemoveFromClusterNodeLabelsRequest.newInstance(null);
+ RemoveFromClusterNodeLabelsResponse response =
+ getRouterRMAdminService().removeFromClusterNodeLabels(req);
+ return response;
+ }
+ });
+ }
+
+ protected ReplaceLabelsOnNodeResponse replaceLabelsOnNode(String user)
+ throws IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction() {
+ @Override
+ public ReplaceLabelsOnNodeResponse run() throws Exception {
+ ReplaceLabelsOnNodeRequest req = ReplaceLabelsOnNodeRequest
+ .newInstance(new HashMap>());
+ ReplaceLabelsOnNodeResponse response =
+ getRouterRMAdminService().replaceLabelsOnNode(req);
+ return response;
+ }
+ });
+ }
+
+ protected CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
+ String user) throws IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user).doAs(
+ new PrivilegedExceptionAction() {
+ @Override
+ public CheckForDecommissioningNodesResponse run() throws Exception {
+ CheckForDecommissioningNodesRequest req =
+ CheckForDecommissioningNodesRequest.newInstance();
+ CheckForDecommissioningNodesResponse response =
+ getRouterRMAdminService().checkForDecommissioningNodes(req);
+ return response;
+ }
+ });
+ }
+
+ protected RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
+ String user) throws IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user).doAs(
+ new PrivilegedExceptionAction() {
+ @Override
+ public RefreshClusterMaxPriorityResponse run() throws Exception {
+ RefreshClusterMaxPriorityRequest req =
+ RefreshClusterMaxPriorityRequest.newInstance();
+ RefreshClusterMaxPriorityResponse response =
+ getRouterRMAdminService().refreshClusterMaxPriority(req);
+ return response;
+ }
+ });
+ }
+
+ protected String[] getGroupsForUser(String user)
+ throws IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction() {
+ @Override
+ public String[] run() throws Exception {
+ String[] response =
+ getRouterRMAdminService().getGroupsForUser(user);
+ return response;
+ }
+ });
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/MockRMAdminRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/MockRMAdminRequestInterceptor.java
new file mode 100644
index 00000000000..ab7bdb41ed2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/MockRMAdminRequestInterceptor.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.rmadmin;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
+
+/**
+ * This class mocks the RMAmdinRequestInterceptor.
+ */
+public class MockRMAdminRequestInterceptor
+ extends DefaultRMAdminRequestInterceptor {
+
+ public void init(String user) {
+ MockResourceManagerFacade mockRM = new MockResourceManagerFacade(
+ new YarnConfiguration(super.getConf()), 0);
+ super.setRMAdmin(mockRM);
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java
new file mode 100644
index 00000000000..38dcc3d96d3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.rmadmin;
+
+import java.io.IOException;
+
+import org.apache.hadoop.ipc.StandbyException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
+
+/**
+ * Mock intercepter that does not do anything other than forwarding it to the
+ * next intercepter in the chain.
+ */
+public class PassThroughRMAdminRequestInterceptor
+ extends AbstractRMAdminRequestInterceptor {
+
+ @Override
+ public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
+ throws StandbyException, YarnException, IOException {
+ return getNextInterceptor().refreshQueues(request);
+ }
+
+ @Override
+ public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
+ throws StandbyException, YarnException, IOException {
+ return getNextInterceptor().refreshNodes(request);
+ }
+
+ @Override
+ public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
+ RefreshSuperUserGroupsConfigurationRequest request)
+ throws StandbyException, YarnException, IOException {
+ return getNextInterceptor().refreshSuperUserGroupsConfiguration(request);
+ }
+
+ @Override
+ public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
+ RefreshUserToGroupsMappingsRequest request)
+ throws StandbyException, YarnException, IOException {
+ return getNextInterceptor().refreshUserToGroupsMappings(request);
+ }
+
+ @Override
+ public RefreshAdminAclsResponse refreshAdminAcls(
+ RefreshAdminAclsRequest request) throws YarnException, IOException {
+ return getNextInterceptor().refreshAdminAcls(request);
+ }
+
+ @Override
+ public RefreshServiceAclsResponse refreshServiceAcls(
+ RefreshServiceAclsRequest request) throws YarnException, IOException {
+ return getNextInterceptor().refreshServiceAcls(request);
+ }
+
+ @Override
+ public UpdateNodeResourceResponse updateNodeResource(
+ UpdateNodeResourceRequest request) throws YarnException, IOException {
+ return getNextInterceptor().updateNodeResource(request);
+ }
+
+ @Override
+ public RefreshNodesResourcesResponse refreshNodesResources(
+ RefreshNodesResourcesRequest request) throws YarnException, IOException {
+ return getNextInterceptor().refreshNodesResources(request);
+ }
+
+ @Override
+ public AddToClusterNodeLabelsResponse addToClusterNodeLabels(
+ AddToClusterNodeLabelsRequest request) throws YarnException, IOException {
+ return getNextInterceptor().addToClusterNodeLabels(request);
+ }
+
+ @Override
+ public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
+ RemoveFromClusterNodeLabelsRequest request)
+ throws YarnException, IOException {
+ return getNextInterceptor().removeFromClusterNodeLabels(request);
+ }
+
+ @Override
+ public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
+ ReplaceLabelsOnNodeRequest request) throws YarnException, IOException {
+ return getNextInterceptor().replaceLabelsOnNode(request);
+ }
+
+ @Override
+ public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
+ CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)
+ throws YarnException, IOException {
+ return getNextInterceptor()
+ .checkForDecommissioningNodes(checkForDecommissioningNodesRequest);
+ }
+
+ @Override
+ public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
+ RefreshClusterMaxPriorityRequest request)
+ throws YarnException, IOException {
+ return getNextInterceptor().refreshClusterMaxPriority(request);
+ }
+
+ @Override
+ public String[] getGroupsForUser(String user) throws IOException {
+ return getNextInterceptor().getGroupsForUser(user);
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestRouterRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestRouterRMAdminService.java
new file mode 100644
index 00000000000..11786e6f980
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestRouterRMAdminService.java
@@ -0,0 +1,219 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.router.rmadmin;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
+import org.apache.hadoop.yarn.server.router.rmadmin.RouterRMAdminService.RequestInterceptorChainWrapper;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test class to validate the RMAdmin Service inside the Router.
+ */
+public class TestRouterRMAdminService extends BaseRouterRMAdminTest {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestRouterRMAdminService.class);
+
+ /**
+ * Tests if the pipeline is created properly.
+ */
+ @Test
+ public void testRequestInterceptorChainCreation() throws Exception {
+ RMAdminRequestInterceptor root =
+ super.getRouterRMAdminService().createRequestInterceptorChain();
+ int index = 0;
+ while (root != null) {
+ // The current pipeline is:
+ // PassThroughRMAdminRequestInterceptor - index = 0
+ // PassThroughRMAdminRequestInterceptor - index = 1
+ // PassThroughRMAdminRequestInterceptor - index = 2
+ // MockClientRequestInterceptor - index = 3
+ switch (index) {
+ case 0: // Fall to the next case
+ case 1: // Fall to the next case
+ case 2:
+ // If index is equal to 0,1 or 2 we fall in this check
+ Assert.assertEquals(
+ PassThroughRMAdminRequestInterceptor.class.getName(),
+ root.getClass().getName());
+ break;
+ case 3:
+ Assert.assertEquals(MockRMAdminRequestInterceptor.class.getName(),
+ root.getClass().getName());
+ break;
+ default:
+ Assert.fail();
+ }
+ root = root.getNextInterceptor();
+ index++;
+ }
+ Assert.assertEquals("The number of interceptors in chain does not match", 4,
+ index);
+ }
+
+ /**
+ * Test if the RouterRMAdmin forwards all the requests to the MockRM and get
+ * back the responses.
+ */
+ @Test
+ public void testRouterRMAdminServiceE2E() throws Exception {
+
+ String user = "test1";
+
+ LOG.info("testRouterRMAdminServiceE2E - Refresh Queues");
+
+ RefreshQueuesResponse responseRefreshQueues = refreshQueues(user);
+ Assert.assertNotNull(responseRefreshQueues);
+
+ LOG.info("testRouterRMAdminServiceE2E - Refresh Nodes");
+
+ RefreshNodesResponse responseRefreshNodes = refreshNodes(user);
+ Assert.assertNotNull(responseRefreshNodes);
+
+ LOG.info("testRouterRMAdminServiceE2E - Refresh Super User");
+
+ RefreshSuperUserGroupsConfigurationResponse responseRefreshSuperUser =
+ refreshSuperUserGroupsConfiguration(user);
+ Assert.assertNotNull(responseRefreshSuperUser);
+
+ LOG.info("testRouterRMAdminServiceE2E - Refresh User to Group");
+
+ RefreshUserToGroupsMappingsResponse responseRefreshUserToGroup =
+ refreshUserToGroupsMappings(user);
+ Assert.assertNotNull(responseRefreshUserToGroup);
+
+ LOG.info("testRouterRMAdminServiceE2E - Refresh Admin Acls");
+
+ RefreshAdminAclsResponse responseRefreshAdminAcls = refreshAdminAcls(user);
+ Assert.assertNotNull(responseRefreshAdminAcls);
+
+ LOG.info("testRouterRMAdminServiceE2E - Refresh Service Acls");
+
+ RefreshServiceAclsResponse responseRefreshServiceAcls =
+ refreshServiceAcls(user);
+ Assert.assertNotNull(responseRefreshServiceAcls);
+
+ LOG.info("testRouterRMAdminServiceE2E - Update Node Resource");
+
+ UpdateNodeResourceResponse responseUpdateNodeResource =
+ updateNodeResource(user);
+ Assert.assertNotNull(responseUpdateNodeResource);
+
+ LOG.info("testRouterRMAdminServiceE2E - Refresh Nodes Resource");
+
+ RefreshNodesResourcesResponse responseRefreshNodesResources =
+ refreshNodesResources(user);
+ Assert.assertNotNull(responseRefreshNodesResources);
+
+ LOG.info("testRouterRMAdminServiceE2E - Add To Cluster NodeLabels");
+
+ AddToClusterNodeLabelsResponse responseAddToClusterNodeLabels =
+ addToClusterNodeLabels(user);
+ Assert.assertNotNull(responseAddToClusterNodeLabels);
+
+ LOG.info("testRouterRMAdminServiceE2E - Remove To Cluster NodeLabels");
+
+ RemoveFromClusterNodeLabelsResponse responseRemoveFromClusterNodeLabels =
+ removeFromClusterNodeLabels(user);
+ Assert.assertNotNull(responseRemoveFromClusterNodeLabels);
+
+ LOG.info("testRouterRMAdminServiceE2E - Replace Labels On Node");
+
+ ReplaceLabelsOnNodeResponse responseReplaceLabelsOnNode =
+ replaceLabelsOnNode(user);
+ Assert.assertNotNull(responseReplaceLabelsOnNode);
+
+ LOG.info("testRouterRMAdminServiceE2E - Check For Decommissioning Nodes");
+
+ CheckForDecommissioningNodesResponse responseCheckForDecom =
+ checkForDecommissioningNodes(user);
+ Assert.assertNotNull(responseCheckForDecom);
+
+ LOG.info("testRouterRMAdminServiceE2E - Refresh Cluster Max Priority");
+
+ RefreshClusterMaxPriorityResponse responseRefreshClusterMaxPriority =
+ refreshClusterMaxPriority(user);
+ Assert.assertNotNull(responseRefreshClusterMaxPriority);
+
+ LOG.info("testRouterRMAdminServiceE2E - Get Groups For User");
+
+ String[] responseGetGroupsForUser = getGroupsForUser(user);
+ Assert.assertNotNull(responseGetGroupsForUser);
+
+ }
+
+ /**
+ * Test if the different chains for users are generated, and LRU cache is
+ * working as expected.
+ */
+ @Test
+ public void testUsersChainMapWithLRUCache()
+ throws YarnException, IOException, InterruptedException {
+
+ Map pipelines;
+ RequestInterceptorChainWrapper chain;
+
+ refreshQueues("test1");
+ refreshQueues("test2");
+ refreshQueues("test3");
+ refreshQueues("test4");
+ refreshQueues("test5");
+ refreshQueues("test6");
+ refreshQueues("test7");
+ refreshQueues("test8");
+
+ pipelines = super.getRouterRMAdminService().getPipelines();
+ Assert.assertEquals(8, pipelines.size());
+
+ refreshQueues("test9");
+ refreshQueues("test10");
+ refreshQueues("test1");
+ refreshQueues("test11");
+
+ // The cache max size is defined in
+ // BaseRouterClientRMTest.TEST_MAX_CACHE_SIZE
+ Assert.assertEquals(10, pipelines.size());
+
+ chain = pipelines.get("test1");
+ Assert.assertNotNull("test1 should not be evicted", chain);
+
+ chain = pipelines.get("test2");
+ Assert.assertNull("test2 should have been evicted", chain);
+ }
+
+}