diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java index 2696bca6c44..aeb3be8aa62 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java @@ -319,11 +319,16 @@ public class AMRMProxyService extends AbstractService implements + " ApplicationId:" + applicationAttemptId + " for the user: " + user); - RequestInterceptor interceptorChain = - this.createRequestInterceptorChain(); - interceptorChain.init(createApplicationMasterContext( - applicationAttemptId, user, amrmToken, localToken)); - chainWrapper.init(interceptorChain, applicationAttemptId); + try { + RequestInterceptor interceptorChain = + this.createRequestInterceptorChain(); + interceptorChain.init(createApplicationMasterContext(this.nmContext, + applicationAttemptId, user, amrmToken, localToken)); + chainWrapper.init(interceptorChain, applicationAttemptId); + } catch (Exception e) { + this.applPipelineMap.remove(applicationAttemptId.getApplicationId()); + throw e; + } } /** @@ -339,8 +344,10 @@ public class AMRMProxyService extends AbstractService implements this.applPipelineMap.remove(applicationId); if (pipeline == null) { - LOG.info("Request to stop an application that does not exist. Id:" - + applicationId); + LOG.info( + "No interceptor pipeline for application {}," + + " likely because its AM is not run in this node.", + applicationId); } else { // Remove the appAttempt in AMRMTokenSecretManager this.secretManager @@ -413,11 +420,11 @@ public class AMRMProxyService extends AbstractService implements } private AMRMProxyApplicationContext createApplicationMasterContext( - ApplicationAttemptId applicationAttemptId, String user, + Context context, ApplicationAttemptId applicationAttemptId, String user, Token amrmToken, Token localToken) { AMRMProxyApplicationContextImpl appContext = - new AMRMProxyApplicationContextImpl(this.nmContext, getConfig(), + new AMRMProxyApplicationContextImpl(context, getConfig(), applicationAttemptId, user, amrmToken, localToken); return appContext; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index 4ec50691609..4b317e504af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -120,9 +120,9 @@ public abstract class BaseAMRMProxyTest { + MockRequestInterceptor.class.getName()); this.dispatcher = new AsyncDispatcher(); - this.dispatcher.init(conf); + this.dispatcher.init(this.conf); this.dispatcher.start(); - this.amrmProxyService = createAndStartAMRMProxyService(); + createAndStartAMRMProxyService(this.conf); } @After @@ -136,12 +136,19 @@ public abstract class BaseAMRMProxyTest { return threadpool; } - protected MockAMRMProxyService createAndStartAMRMProxyService() { - MockAMRMProxyService svc = + protected Configuration getConf() { + return this.conf; + } + + protected void createAndStartAMRMProxyService(Configuration config) { + // Stop the existing instance first if not null + if (this.amrmProxyService != null) { + this.amrmProxyService.stop(); + } + this.amrmProxyService = new MockAMRMProxyService(new NullContext(), dispatcher); - svc.init(conf); - svc.start(); - return svc; + this.amrmProxyService.init(config); + this.amrmProxyService.start(); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java index 837278c234e..fa17f2668e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java @@ -21,9 +21,11 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; @@ -34,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService.RequestInterceptorChainWrapper; import org.apache.hadoop.yarn.util.Records; @@ -94,6 +97,32 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest { Assert.assertEquals(Integer.toString(testAppId), response1.getQueue()); } + /** + * Tests the case when interceptor pipeline initialization fails. + */ + @Test + public void testInterceptorInitFailure() { + Configuration conf = this.getConf(); + // Override with a bad interceptor configuration + conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE, + "class.that.does.not.exist"); + + // Reinitialize instance with the new config + createAndStartAMRMProxyService(conf); + int testAppId = 1; + try { + registerApplicationMaster(testAppId); + Assert.fail("Should not reach here. Expecting an exception thrown"); + } catch (Exception e) { + Map pipelines = + getAMRMProxyService().getPipelines(); + ApplicationId id = getApplicationId(testAppId); + Assert.assertTrue( + "The interceptor pipeline should be removed if initializtion fails", + pipelines.get(id) == null); + } + } + /** * Tests the registration of multiple application master serially one at a * time.