YARN-6281. Cleanup when AMRMProxy fails to initialize a new interceptor chain. (Botong Huang via Subru)
(cherry picked from commit 57a9afbd45b7ef8e6021cc58f96bd0074bf1389d)
This commit is contained in:
parent
cd9ff27ffc
commit
749e5c09b9
|
@ -319,11 +319,16 @@ public class AMRMProxyService extends AbstractService implements
|
|||
+ " ApplicationId:" + applicationAttemptId + " for the user: "
|
||||
+ user);
|
||||
|
||||
try {
|
||||
RequestInterceptor interceptorChain =
|
||||
this.createRequestInterceptorChain();
|
||||
interceptorChain.init(createApplicationMasterContext(
|
||||
interceptorChain.init(createApplicationMasterContext(this.nmContext,
|
||||
applicationAttemptId, user, amrmToken, localToken));
|
||||
chainWrapper.init(interceptorChain, applicationAttemptId);
|
||||
} catch (Exception e) {
|
||||
this.applPipelineMap.remove(applicationAttemptId.getApplicationId());
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -339,8 +344,10 @@ public class AMRMProxyService extends AbstractService implements
|
|||
this.applPipelineMap.remove(applicationId);
|
||||
|
||||
if (pipeline == null) {
|
||||
LOG.info("Request to stop an application that does not exist. Id:"
|
||||
+ applicationId);
|
||||
LOG.info(
|
||||
"No interceptor pipeline for application {},"
|
||||
+ " likely because its AM is not run in this node.",
|
||||
applicationId);
|
||||
} else {
|
||||
// Remove the appAttempt in AMRMTokenSecretManager
|
||||
this.secretManager
|
||||
|
@ -413,11 +420,11 @@ public class AMRMProxyService extends AbstractService implements
|
|||
}
|
||||
|
||||
private AMRMProxyApplicationContext createApplicationMasterContext(
|
||||
ApplicationAttemptId applicationAttemptId, String user,
|
||||
Context context, ApplicationAttemptId applicationAttemptId, String user,
|
||||
Token<AMRMTokenIdentifier> amrmToken,
|
||||
Token<AMRMTokenIdentifier> localToken) {
|
||||
AMRMProxyApplicationContextImpl appContext =
|
||||
new AMRMProxyApplicationContextImpl(this.nmContext, getConfig(),
|
||||
new AMRMProxyApplicationContextImpl(context, getConfig(),
|
||||
applicationAttemptId, user, amrmToken, localToken);
|
||||
return appContext;
|
||||
}
|
||||
|
|
|
@ -121,9 +121,9 @@ public abstract class BaseAMRMProxyTest {
|
|||
+ MockRequestInterceptor.class.getName());
|
||||
|
||||
this.dispatcher = new AsyncDispatcher();
|
||||
this.dispatcher.init(conf);
|
||||
this.dispatcher.init(this.conf);
|
||||
this.dispatcher.start();
|
||||
this.amrmProxyService = createAndStartAMRMProxyService();
|
||||
createAndStartAMRMProxyService(this.conf);
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -137,12 +137,19 @@ public abstract class BaseAMRMProxyTest {
|
|||
return threadpool;
|
||||
}
|
||||
|
||||
protected MockAMRMProxyService createAndStartAMRMProxyService() {
|
||||
MockAMRMProxyService svc =
|
||||
protected Configuration getConf() {
|
||||
return this.conf;
|
||||
}
|
||||
|
||||
protected void createAndStartAMRMProxyService(Configuration config) {
|
||||
// Stop the existing instance first if not null
|
||||
if (this.amrmProxyService != null) {
|
||||
this.amrmProxyService.stop();
|
||||
}
|
||||
this.amrmProxyService =
|
||||
new MockAMRMProxyService(new NullContext(), dispatcher);
|
||||
svc.init(conf);
|
||||
svc.start();
|
||||
return svc;
|
||||
this.amrmProxyService.init(config);
|
||||
this.amrmProxyService.start();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -21,9 +21,11 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||
|
@ -34,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.Container;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService.RequestInterceptorChainWrapper;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
@ -94,6 +97,32 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
|
|||
Assert.assertEquals(Integer.toString(testAppId), response1.getQueue());
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the case when interceptor pipeline initialization fails.
|
||||
*/
|
||||
@Test
|
||||
public void testInterceptorInitFailure() {
|
||||
Configuration conf = this.getConf();
|
||||
// Override with a bad interceptor configuration
|
||||
conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
|
||||
"class.that.does.not.exist");
|
||||
|
||||
// Reinitialize instance with the new config
|
||||
createAndStartAMRMProxyService(conf);
|
||||
int testAppId = 1;
|
||||
try {
|
||||
registerApplicationMaster(testAppId);
|
||||
Assert.fail("Should not reach here. Expecting an exception thrown");
|
||||
} catch (Exception e) {
|
||||
Map<ApplicationId, RequestInterceptorChainWrapper> pipelines =
|
||||
getAMRMProxyService().getPipelines();
|
||||
ApplicationId id = getApplicationId(testAppId);
|
||||
Assert.assertTrue(
|
||||
"The interceptor pipeline should be removed if initializtion fails",
|
||||
pipelines.get(id) == null);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the registration of multiple application master serially one at a
|
||||
* time.
|
||||
|
|
Loading…
Reference in New Issue