diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 4c4298d3241..73f1038f77e 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -567,4 +567,11 @@
+
+
+
+
+
+
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
new file mode 100644
index 00000000000..5f82d69479f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -0,0 +1,510 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
+import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Extends the AbstractRequestInterceptor and provides an implementation for
+ * federation of YARN RM and scaling an application across multiple YARN
+ * sub-clusters. All the federation specific implementation is encapsulated in
+ * this class. This is always the last intercepter in the chain.
+ */
+public class FederationInterceptor extends AbstractRequestInterceptor {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(FederationInterceptor.class);
+
+ /**
+ * The home sub-cluster is the sub-cluster where the AM container is running
+ * in.
+ */
+ private ApplicationMasterProtocol homeRM;
+ private SubClusterId homeSubClusterId;
+
+ /**
+ * Used to keep track of the container Id and the sub cluster RM that created
+ * the container, so that we know which sub-cluster to forward later requests
+ * about existing containers to.
+ */
+ private Map containerIdToSubClusterIdMap;
+
+ /**
+ * The original registration request that was sent by the AM. This instance is
+ * reused to register/re-register with all the sub-cluster RMs.
+ */
+ private RegisterApplicationMasterRequest amRegistrationRequest;
+
+ /**
+ * The original registration response from home RM. This instance is reused
+ * for duplicate register request from AM, triggered by timeout between AM and
+ * AMRMProxy.
+ */
+ private RegisterApplicationMasterResponse amRegistrationResponse;
+
+ /** The proxy ugi used to talk to home RM. */
+ private UserGroupInformation appOwner;
+
+ /**
+ * Creates an instance of the FederationInterceptor class.
+ */
+ public FederationInterceptor() {
+ this.containerIdToSubClusterIdMap = new ConcurrentHashMap<>();
+ this.amRegistrationResponse = null;
+ }
+
+ /**
+ * Initializes the instance using specified context.
+ */
+ @Override
+ public void init(AMRMProxyApplicationContext appContext) {
+ super.init(appContext);
+ LOG.info("Initializing Federation Interceptor");
+
+ // Update the conf if available
+ Configuration conf = appContext.getConf();
+ if (conf == null) {
+ conf = getConf();
+ } else {
+ setConf(conf);
+ }
+
+ try {
+ this.appOwner = UserGroupInformation.createProxyUser(appContext.getUser(),
+ UserGroupInformation.getCurrentUser());
+ } catch (Exception ex) {
+ throw new YarnRuntimeException(ex);
+ }
+
+ this.homeSubClusterId =
+ SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
+ this.homeRM = createHomeRMProxy(appContext);
+ }
+
+ /**
+ * Sends the application master's registration request to the home RM.
+ *
+ * Between AM and AMRMProxy, FederationInterceptor modifies the RM behavior,
+ * so that when AM registers more than once, it returns the same register
+ * success response instead of throwing
+ * {@link InvalidApplicationMasterRequestException}. Furthermore, we present
+ * to AM as if we are the RM that never fails over. When actual RM fails over,
+ * we always re-register automatically.
+ *
+ * We did this because FederationInterceptor can receive concurrent register
+ * requests from AM because of timeout between AM and AMRMProxy, which is
+ * shorter than the timeout + failOver between FederationInterceptor
+ * (AMRMProxy) and RM.
+ */
+ @Override
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ RegisterApplicationMasterRequest request)
+ throws YarnException, IOException {
+ // If AM is calling with a different request, complain
+ if (this.amRegistrationRequest != null
+ && !this.amRegistrationRequest.equals(request)) {
+ throw new YarnException("A different request body recieved. AM should"
+ + " not call registerApplicationMaster with different request body");
+ }
+
+ // Save the registration request. This will be used for registering with
+ // secondary sub-clusters using UAMs, as well as re-register later
+ this.amRegistrationRequest = request;
+
+ /*
+ * Present to AM as if we are the RM that never fails over. When actual RM
+ * fails over, we always re-register automatically.
+ *
+ * We did this because it is possible for AM to send duplicate register
+ * request because of timeout. When it happens, it is fine to simply return
+ * the success message. Out of all outstanding register threads, only the
+ * last one will still have an unbroken RPC connection and successfully
+ * return the response.
+ */
+ if (this.amRegistrationResponse != null) {
+ return this.amRegistrationResponse;
+ }
+
+ /*
+ * Send a registration request to the home resource manager. Note that here
+ * we don't register with other sub-cluster resource managers because that
+ * will prevent us from using new sub-clusters that get added while the AM
+ * is running and will breaks the elasticity feature. The registration with
+ * the other sub-cluster RM will be done lazily as needed later.
+ */
+ try {
+ this.amRegistrationResponse =
+ this.homeRM.registerApplicationMaster(request);
+ } catch (InvalidApplicationMasterRequestException e) {
+ if (e.getMessage()
+ .contains(AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE)) {
+ // Some other register thread might have succeeded in the meantime
+ if (this.amRegistrationResponse != null) {
+ LOG.info("Other concurrent thread registered successfully, "
+ + "simply return the same success register response");
+ return this.amRegistrationResponse;
+ }
+ }
+ // This is a real issue, throw back to AM
+ throw e;
+ }
+
+ // the queue this application belongs will be used for getting
+ // AMRMProxy policy from state store.
+ String queue = this.amRegistrationResponse.getQueue();
+ if (queue == null) {
+ LOG.warn("Received null queue for application "
+ + getApplicationContext().getApplicationAttemptId().getApplicationId()
+ + " from home subcluster. Will use default queue name "
+ + YarnConfiguration.DEFAULT_QUEUE_NAME
+ + " for getting AMRMProxyPolicy");
+ } else {
+ LOG.info("Application "
+ + getApplicationContext().getApplicationAttemptId().getApplicationId()
+ + " belongs to queue " + queue);
+ }
+
+ return this.amRegistrationResponse;
+ }
+
+ /**
+ * Sends the heart beats to the home RM and the secondary sub-cluster RMs that
+ * are being used by the application.
+ */
+ @Override
+ public AllocateResponse allocate(AllocateRequest request)
+ throws YarnException {
+
+ try {
+ // Split the heart beat request into multiple requests, one for each
+ // sub-cluster RM that is used by this application.
+ Map requests =
+ splitAllocateRequest(request);
+
+ // Send the request to the home RM and get the response
+ AllocateResponse homeResponse = AMRMClientUtils.allocateWithReRegister(
+ requests.get(this.homeSubClusterId), this.homeRM,
+ this.amRegistrationRequest,
+ getApplicationContext().getApplicationAttemptId());
+
+ // If the resource manager sent us a new token, add to the current user
+ if (homeResponse.getAMRMToken() != null) {
+ LOG.debug("Received new AMRMToken");
+ YarnServerSecurityUtils.updateAMRMToken(homeResponse.getAMRMToken(),
+ this.appOwner, getConf());
+ }
+
+ // Merge the responses from home and secondary sub-cluster RMs
+ homeResponse = mergeAllocateResponses(homeResponse);
+
+ // return the final response to the application master.
+ return homeResponse;
+ } catch (IOException ex) {
+ LOG.error("Exception encountered while processing heart beat", ex);
+ throw new YarnException(ex);
+ }
+ }
+
+ /**
+ * Sends the finish application master request to all the resource managers
+ * used by the application.
+ */
+ @Override
+ public FinishApplicationMasterResponse finishApplicationMaster(
+ FinishApplicationMasterRequest request)
+ throws YarnException, IOException {
+
+ FinishApplicationMasterResponse homeResponse =
+ AMRMClientUtils.finishAMWithReRegister(request, this.homeRM,
+ this.amRegistrationRequest,
+ getApplicationContext().getApplicationAttemptId());
+ return homeResponse;
+ }
+
+ @Override
+ public void setNextInterceptor(RequestInterceptor next) {
+ throw new YarnRuntimeException(
+ "setNextInterceptor is being called on FederationInterceptor. "
+ + "It should always be used as the last interceptor in the chain");
+ }
+
+ /**
+ * This is called when the application pipeline is being destroyed. We will
+ * release all the resources that we are holding in this call.
+ */
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ }
+
+ /**
+ * Returns instance of the ApplicationMasterProtocol proxy class that is used
+ * to connect to the Home resource manager.
+ *
+ * @param appContext AMRMProxyApplicationContext
+ * @return the proxy created
+ */
+ protected ApplicationMasterProtocol createHomeRMProxy(
+ AMRMProxyApplicationContext appContext) {
+ try {
+ return FederationProxyProviderUtil.createRMProxy(appContext.getConf(),
+ ApplicationMasterProtocol.class, this.homeSubClusterId, this.appOwner,
+ appContext.getAMRMToken());
+ } catch (Exception ex) {
+ throw new YarnRuntimeException(ex);
+ }
+ }
+
+ /**
+ * In federation, the heart beat request needs to be sent to all the sub
+ * clusters from which the AM has requested containers. This method splits the
+ * specified AllocateRequest from the AM and creates a new request for each
+ * sub-cluster RM.
+ */
+ private Map splitAllocateRequest(
+ AllocateRequest request) throws YarnException {
+ Map requestMap =
+ new HashMap();
+
+ // Create heart beat request for home sub-cluster resource manager
+ findOrCreateAllocateRequestForSubCluster(this.homeSubClusterId, request,
+ requestMap);
+
+ if (!isNullOrEmpty(request.getAskList())) {
+ AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
+ this.homeSubClusterId, request, requestMap);
+ newRequest.getAskList().addAll(request.getAskList());
+ }
+
+ if (request.getResourceBlacklistRequest() != null && !isNullOrEmpty(
+ request.getResourceBlacklistRequest().getBlacklistAdditions())) {
+ for (String resourceName : request.getResourceBlacklistRequest()
+ .getBlacklistAdditions()) {
+ AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
+ this.homeSubClusterId, request, requestMap);
+ newRequest.getResourceBlacklistRequest().getBlacklistAdditions()
+ .add(resourceName);
+ }
+ }
+
+ if (request.getResourceBlacklistRequest() != null && !isNullOrEmpty(
+ request.getResourceBlacklistRequest().getBlacklistRemovals())) {
+ for (String resourceName : request.getResourceBlacklistRequest()
+ .getBlacklistRemovals()) {
+ AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
+ this.homeSubClusterId, request, requestMap);
+ newRequest.getResourceBlacklistRequest().getBlacklistRemovals()
+ .add(resourceName);
+ }
+ }
+
+ if (!isNullOrEmpty(request.getReleaseList())) {
+ for (ContainerId cid : request.getReleaseList()) {
+ if (warnIfNotExists(cid, "release")) {
+ SubClusterId subClusterId =
+ this.containerIdToSubClusterIdMap.get(cid);
+ AllocateRequest newRequest = requestMap.get(subClusterId);
+ newRequest.getReleaseList().add(cid);
+ }
+ }
+ }
+
+ if (!isNullOrEmpty(request.getUpdateRequests())) {
+ for (UpdateContainerRequest ucr : request.getUpdateRequests()) {
+ if (warnIfNotExists(ucr.getContainerId(), "update")) {
+ SubClusterId subClusterId =
+ this.containerIdToSubClusterIdMap.get(ucr.getContainerId());
+ AllocateRequest newRequest = requestMap.get(subClusterId);
+ newRequest.getUpdateRequests().add(ucr);
+ }
+ }
+ }
+
+ return requestMap;
+ }
+
+ /**
+ * Merges the responses from other sub-clusters that we received
+ * asynchronously with the specified home cluster response and keeps track of
+ * the containers received from each sub-cluster resource managers.
+ */
+ private AllocateResponse mergeAllocateResponses(
+ AllocateResponse homeResponse) {
+ // Timing issue, we need to remove the completed and then save the new ones.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Remove containers: "
+ + homeResponse.getCompletedContainersStatuses());
+ LOG.debug("Adding containers: " + homeResponse.getAllocatedContainers());
+ }
+ removeFinishedContainersFromCache(
+ homeResponse.getCompletedContainersStatuses());
+ cacheAllocatedContainers(homeResponse.getAllocatedContainers(),
+ this.homeSubClusterId);
+
+ return homeResponse;
+ }
+
+ /**
+ * Removes the finished containers from the local cache.
+ */
+ private void removeFinishedContainersFromCache(
+ List finishedContainers) {
+ for (ContainerStatus container : finishedContainers) {
+ if (containerIdToSubClusterIdMap
+ .containsKey(container.getContainerId())) {
+ containerIdToSubClusterIdMap.remove(container.getContainerId());
+ }
+ }
+ }
+
+ /**
+ * Add allocated containers to cache mapping.
+ */
+ private void cacheAllocatedContainers(List containers,
+ SubClusterId subClusterId) {
+ for (Container container : containers) {
+ if (containerIdToSubClusterIdMap.containsKey(container.getId())) {
+ SubClusterId existingSubClusterId =
+ containerIdToSubClusterIdMap.get(container.getId());
+ if (existingSubClusterId.equals(subClusterId)) {
+ // When RM fails over, the new RM master might send out the same
+ // container allocation more than once. Just move on in this case.
+ LOG.warn(
+ "Duplicate containerID: {} found in the allocated containers"
+ + " from same subcluster: {}, so ignoring.",
+ container.getId(), subClusterId);
+ } else {
+ // The same container allocation from different subclusters,
+ // something is wrong.
+ // TODO: YARN-6667 if some subcluster RM is configured wrong, we
+ // should not fail the entire heartbeat.
+ throw new YarnRuntimeException(
+ "Duplicate containerID found in the allocated containers. This"
+ + " can happen if the RM epoch is not configured properly."
+ + " ContainerId: " + container.getId().toString()
+ + " ApplicationId: "
+ + getApplicationContext().getApplicationAttemptId()
+ + " From RM: " + subClusterId
+ + " . Previous container was from subcluster: "
+ + existingSubClusterId);
+ }
+ }
+
+ containerIdToSubClusterIdMap.put(container.getId(), subClusterId);
+ }
+ }
+
+ /**
+ * Check to see if an AllocateRequest exists in the Map for the specified sub
+ * cluster. If not found, create a new one, copy the value of responseId and
+ * progress from the orignialAMRequest, save it in the specified Map and
+ * return the new instance. If found, just return the old instance.
+ */
+ private static AllocateRequest findOrCreateAllocateRequestForSubCluster(
+ SubClusterId subClusterId, AllocateRequest originalAMRequest,
+ Map requestMap) {
+ AllocateRequest newRequest = null;
+ if (requestMap.containsKey(subClusterId)) {
+ newRequest = requestMap.get(subClusterId);
+ } else {
+ newRequest = createAllocateRequest();
+ newRequest.setResponseId(originalAMRequest.getResponseId());
+ newRequest.setProgress(originalAMRequest.getProgress());
+ requestMap.put(subClusterId, newRequest);
+ }
+
+ return newRequest;
+ }
+
+ /**
+ * Create an empty AllocateRequest instance.
+ */
+ private static AllocateRequest createAllocateRequest() {
+ AllocateRequest request =
+ AllocateRequest.newInstance(0, 0, null, null, null);
+ request.setAskList(new ArrayList());
+ request.setReleaseList(new ArrayList());
+ ResourceBlacklistRequest blackList =
+ ResourceBlacklistRequest.newInstance(null, null);
+ blackList.setBlacklistAdditions(new ArrayList());
+ blackList.setBlacklistRemovals(new ArrayList());
+ request.setResourceBlacklistRequest(blackList);
+ request.setUpdateRequests(new ArrayList());
+ return request;
+ }
+
+ /**
+ * Check to see if the specified containerId exists in the cache and log an
+ * error if not found.
+ *
+ * @param containerId the container id
+ * @param actionName the name of the action
+ * @return true if the container exists in the map, false otherwise
+ */
+ private boolean warnIfNotExists(ContainerId containerId, String actionName) {
+ if (!this.containerIdToSubClusterIdMap.containsKey(containerId)) {
+ LOG.error("AM is trying to {} a container {} that does not exist. ",
+ actionName, containerId.toString());
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Utility method to check if the specified Collection is null or empty
+ *
+ * @param c the collection object
+ * @param element type of the collection
+ * @return whether is it is null or empty
+ */
+ public static boolean isNullOrEmpty(Collection c) {
+ return (c == null || c.size() == 0);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
index cf8db07c876..937ede59d7a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService.RequestInterceptorChainWrapper;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
new file mode 100644
index 00000000000..3b564f02a2b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Extends the TestAMRMProxyService and overrides methods in order to use the
+ * AMRMProxyService's pipeline test cases for testing the FederationInterceptor
+ * class. The tests for AMRMProxyService has been written cleverly so that it
+ * can be reused to validate different request intercepter chains.
+ */
+public class TestFederationInterceptor extends BaseAMRMProxyTest {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestFederationInterceptor.class);
+
+ public static final String HOME_SC_ID = "SC-home";
+
+ private TestableFederationInterceptor interceptor;
+
+ private int testAppId;
+ private ApplicationAttemptId attemptId;
+
+ @Override
+ public void setUp() throws IOException {
+ super.setUp();
+ interceptor = new TestableFederationInterceptor();
+
+ testAppId = 1;
+ attemptId = getApplicationAttemptId(testAppId);
+ interceptor.init(new AMRMProxyApplicationContextImpl(null, getConf(),
+ attemptId, "test-user", null, null));
+ }
+
+ @Override
+ public void tearDown() {
+ interceptor.shutdown();
+ super.tearDown();
+ }
+
+ @Override
+ protected YarnConfiguration createConfiguration() {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+ String mockPassThroughInterceptorClass =
+ PassThroughRequestInterceptor.class.getName();
+
+ // Create a request intercepter pipeline for testing. The last one in the
+ // chain is the federation intercepter that calls the mock resource manager.
+ // The others in the chain will simply forward it to the next one in the
+ // chain
+ conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
+ mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
+ + "," + TestableFederationInterceptor.class.getName());
+
+ conf.set(YarnConfiguration.RM_CLUSTER_ID, HOME_SC_ID);
+
+ return conf;
+ }
+
+ @Test
+ public void testRequestInterceptorChainCreation() throws Exception {
+ RequestInterceptor root =
+ super.getAMRMProxyService().createRequestInterceptorChain();
+ int index = 0;
+ while (root != null) {
+ switch (index) {
+ case 0:
+ case 1:
+ Assert.assertEquals(PassThroughRequestInterceptor.class.getName(),
+ root.getClass().getName());
+ break;
+ case 2:
+ Assert.assertEquals(TestableFederationInterceptor.class.getName(),
+ root.getClass().getName());
+ break;
+ default:
+ Assert.fail();
+ }
+ root = root.getNextInterceptor();
+ index++;
+ }
+ Assert.assertEquals("The number of interceptors in chain does not match",
+ Integer.toString(3), Integer.toString(index));
+ }
+
+ /**
+ * Between AM and AMRMProxy, FederationInterceptor modifies the RM behavior,
+ * so that when AM registers more than once, it returns the same register
+ * success response instead of throwing
+ * {@link InvalidApplicationMasterRequestException}
+ *
+ * We did this because FederationInterceptor can receive concurrent register
+ * requests from AM because of timeout between AM and AMRMProxy. This can
+ * possible since the timeout between FederationInterceptor and RM longer
+ * because of performFailover + timeout.
+ */
+ @Test
+ public void testTwoIdenticalRegisterRequest() throws Exception {
+ // Register the application twice
+ RegisterApplicationMasterRequest registerReq =
+ Records.newRecord(RegisterApplicationMasterRequest.class);
+ registerReq.setHost(Integer.toString(testAppId));
+ registerReq.setRpcPort(testAppId);
+ registerReq.setTrackingUrl("");
+
+ for (int i = 0; i < 2; i++) {
+ RegisterApplicationMasterResponse registerResponse =
+ interceptor.registerApplicationMaster(registerReq);
+ Assert.assertNotNull(registerResponse);
+ }
+ }
+
+ @Test
+ public void testTwoDifferentRegisterRequest() throws Exception {
+ // Register the application first time
+ RegisterApplicationMasterRequest registerReq =
+ Records.newRecord(RegisterApplicationMasterRequest.class);
+ registerReq.setHost(Integer.toString(testAppId));
+ registerReq.setRpcPort(testAppId);
+ registerReq.setTrackingUrl("");
+
+ RegisterApplicationMasterResponse registerResponse =
+ interceptor.registerApplicationMaster(registerReq);
+ Assert.assertNotNull(registerResponse);
+
+ // Register the application second time with a different request obj
+ registerReq = Records.newRecord(RegisterApplicationMasterRequest.class);
+ registerReq.setHost(Integer.toString(testAppId));
+ registerReq.setRpcPort(testAppId);
+ registerReq.setTrackingUrl("different");
+ try {
+ registerResponse = interceptor.registerApplicationMaster(registerReq);
+ Assert.fail("Should throw if a different request obj is used");
+ } catch (YarnException e) {
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
new file mode 100644
index 00000000000..0ca74880561
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
+import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager;
+import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
+
+/**
+ * Extends the FederationInterceptor and overrides methods to provide a testable
+ * implementation of FederationInterceptor.
+ */
+public class TestableFederationInterceptor extends FederationInterceptor {
+ private ConcurrentHashMap
+ secondaryResourceManagers = new ConcurrentHashMap<>();
+ private AtomicInteger runningIndex = new AtomicInteger(0);
+ private MockResourceManagerFacade mockRm;
+
+ @Override
+ protected ApplicationMasterProtocol createHomeRMProxy(
+ AMRMProxyApplicationContext appContext) {
+ synchronized (this) {
+ if (mockRm == null) {
+ mockRm = new MockResourceManagerFacade(
+ new YarnConfiguration(super.getConf()), 0);
+ }
+ }
+ return mockRm;
+ }
+
+ @SuppressWarnings("unchecked")
+ protected T createSecondaryRMProxy(Class proxyClass,
+ Configuration conf, String subClusterId) throws IOException {
+ // We create one instance of the mock resource manager per sub cluster. Keep
+ // track of the instances of the RMs in the map keyed by the sub cluster id
+ synchronized (this.secondaryResourceManagers) {
+ if (this.secondaryResourceManagers.contains(subClusterId)) {
+ return (T) this.secondaryResourceManagers.get(subClusterId);
+ } else {
+ // The running index here is used to simulate different RM_EPOCH to
+ // generate unique container identifiers in a federation environment
+ MockResourceManagerFacade rm = new MockResourceManagerFacade(
+ new Configuration(conf), runningIndex.addAndGet(10000));
+ this.secondaryResourceManagers.put(subClusterId, rm);
+ return (T) rm;
+ }
+ }
+ }
+
+ protected void setShouldReRegisterNext() {
+ if (mockRm != null) {
+ mockRm.setShouldReRegisterNext();
+ }
+ for (MockResourceManagerFacade subCluster : secondaryResourceManagers
+ .values()) {
+ subCluster.setShouldReRegisterNext();
+ }
+ }
+
+ /**
+ * Extends the UnmanagedAMPoolManager and overrides methods to provide a
+ * testable implementation of UnmanagedAMPoolManager.
+ */
+ protected class TestableUnmanagedAMPoolManager
+ extends UnmanagedAMPoolManager {
+ public TestableUnmanagedAMPoolManager(ExecutorService threadpool) {
+ super(threadpool);
+ }
+
+ @Override
+ public UnmanagedApplicationManager createUAM(Configuration conf,
+ ApplicationId appId, String queueName, String submitter,
+ String appNameSuffix) {
+ return new TestableUnmanagedApplicationManager(conf, appId, queueName,
+ submitter, appNameSuffix);
+ }
+ }
+
+ /**
+ * Extends the UnmanagedApplicationManager and overrides methods to provide a
+ * testable implementation.
+ */
+ protected class TestableUnmanagedApplicationManager
+ extends UnmanagedApplicationManager {
+
+ public TestableUnmanagedApplicationManager(Configuration conf,
+ ApplicationId appId, String queueName, String submitter,
+ String appNameSuffix) {
+ super(conf, appId, queueName, submitter, appNameSuffix);
+ }
+
+ /**
+ * We override this method here to return a mock RM instances. The base
+ * class returns the proxy to the real RM which will not work in case of
+ * stand alone test cases.
+ */
+ @Override
+ protected T createRMProxy(Class protocol, Configuration config,
+ UserGroupInformation user, Token token)
+ throws IOException {
+ return createSecondaryRMProxy(protocol, config,
+ YarnConfiguration.getClusterId(config));
+ }
+ }
+}