YARN-6281. Cleanup when AMRMProxy fails to initialize a new interceptor chain. (Botong Huang via Subru)
(cherry picked from commit 57a9afbd45b7ef8e6021cc58f96bd0074bf1389d)
(cherry picked from commit 749e5c09b9
)
This commit is contained in:
parent
7511cfb842
commit
d8f0d409b9
|
@ -319,11 +319,16 @@ public class AMRMProxyService extends AbstractService implements
|
||||||
+ " ApplicationId:" + applicationAttemptId + " for the user: "
|
+ " ApplicationId:" + applicationAttemptId + " for the user: "
|
||||||
+ user);
|
+ user);
|
||||||
|
|
||||||
|
try {
|
||||||
RequestInterceptor interceptorChain =
|
RequestInterceptor interceptorChain =
|
||||||
this.createRequestInterceptorChain();
|
this.createRequestInterceptorChain();
|
||||||
interceptorChain.init(createApplicationMasterContext(
|
interceptorChain.init(createApplicationMasterContext(this.nmContext,
|
||||||
applicationAttemptId, user, amrmToken, localToken));
|
applicationAttemptId, user, amrmToken, localToken));
|
||||||
chainWrapper.init(interceptorChain, applicationAttemptId);
|
chainWrapper.init(interceptorChain, applicationAttemptId);
|
||||||
|
} catch (Exception e) {
|
||||||
|
this.applPipelineMap.remove(applicationAttemptId.getApplicationId());
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -339,8 +344,10 @@ public class AMRMProxyService extends AbstractService implements
|
||||||
this.applPipelineMap.remove(applicationId);
|
this.applPipelineMap.remove(applicationId);
|
||||||
|
|
||||||
if (pipeline == null) {
|
if (pipeline == null) {
|
||||||
LOG.info("Request to stop an application that does not exist. Id:"
|
LOG.info(
|
||||||
+ applicationId);
|
"No interceptor pipeline for application {},"
|
||||||
|
+ " likely because its AM is not run in this node.",
|
||||||
|
applicationId);
|
||||||
} else {
|
} else {
|
||||||
// Remove the appAttempt in AMRMTokenSecretManager
|
// Remove the appAttempt in AMRMTokenSecretManager
|
||||||
this.secretManager
|
this.secretManager
|
||||||
|
@ -413,11 +420,11 @@ public class AMRMProxyService extends AbstractService implements
|
||||||
}
|
}
|
||||||
|
|
||||||
private AMRMProxyApplicationContext createApplicationMasterContext(
|
private AMRMProxyApplicationContext createApplicationMasterContext(
|
||||||
ApplicationAttemptId applicationAttemptId, String user,
|
Context context, ApplicationAttemptId applicationAttemptId, String user,
|
||||||
Token<AMRMTokenIdentifier> amrmToken,
|
Token<AMRMTokenIdentifier> amrmToken,
|
||||||
Token<AMRMTokenIdentifier> localToken) {
|
Token<AMRMTokenIdentifier> localToken) {
|
||||||
AMRMProxyApplicationContextImpl appContext =
|
AMRMProxyApplicationContextImpl appContext =
|
||||||
new AMRMProxyApplicationContextImpl(this.nmContext, getConfig(),
|
new AMRMProxyApplicationContextImpl(context, getConfig(),
|
||||||
applicationAttemptId, user, amrmToken, localToken);
|
applicationAttemptId, user, amrmToken, localToken);
|
||||||
return appContext;
|
return appContext;
|
||||||
}
|
}
|
||||||
|
|
|
@ -120,9 +120,9 @@ public abstract class BaseAMRMProxyTest {
|
||||||
+ MockRequestInterceptor.class.getName());
|
+ MockRequestInterceptor.class.getName());
|
||||||
|
|
||||||
this.dispatcher = new AsyncDispatcher();
|
this.dispatcher = new AsyncDispatcher();
|
||||||
this.dispatcher.init(conf);
|
this.dispatcher.init(this.conf);
|
||||||
this.dispatcher.start();
|
this.dispatcher.start();
|
||||||
this.amrmProxyService = createAndStartAMRMProxyService();
|
createAndStartAMRMProxyService(this.conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
@ -136,12 +136,19 @@ public abstract class BaseAMRMProxyTest {
|
||||||
return threadpool;
|
return threadpool;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected MockAMRMProxyService createAndStartAMRMProxyService() {
|
protected Configuration getConf() {
|
||||||
MockAMRMProxyService svc =
|
return this.conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void createAndStartAMRMProxyService(Configuration config) {
|
||||||
|
// Stop the existing instance first if not null
|
||||||
|
if (this.amrmProxyService != null) {
|
||||||
|
this.amrmProxyService.stop();
|
||||||
|
}
|
||||||
|
this.amrmProxyService =
|
||||||
new MockAMRMProxyService(new NullContext(), dispatcher);
|
new MockAMRMProxyService(new NullContext(), dispatcher);
|
||||||
svc.init(conf);
|
this.amrmProxyService.init(config);
|
||||||
svc.start();
|
this.amrmProxyService.start();
|
||||||
return svc;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -21,9 +21,11 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||||
|
@ -34,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService.RequestInterceptorChainWrapper;
|
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService.RequestInterceptorChainWrapper;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
@ -94,6 +97,32 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
|
||||||
Assert.assertEquals(Integer.toString(testAppId), response1.getQueue());
|
Assert.assertEquals(Integer.toString(testAppId), response1.getQueue());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests the case when interceptor pipeline initialization fails.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testInterceptorInitFailure() {
|
||||||
|
Configuration conf = this.getConf();
|
||||||
|
// Override with a bad interceptor configuration
|
||||||
|
conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
|
||||||
|
"class.that.does.not.exist");
|
||||||
|
|
||||||
|
// Reinitialize instance with the new config
|
||||||
|
createAndStartAMRMProxyService(conf);
|
||||||
|
int testAppId = 1;
|
||||||
|
try {
|
||||||
|
registerApplicationMaster(testAppId);
|
||||||
|
Assert.fail("Should not reach here. Expecting an exception thrown");
|
||||||
|
} catch (Exception e) {
|
||||||
|
Map<ApplicationId, RequestInterceptorChainWrapper> pipelines =
|
||||||
|
getAMRMProxyService().getPipelines();
|
||||||
|
ApplicationId id = getApplicationId(testAppId);
|
||||||
|
Assert.assertTrue(
|
||||||
|
"The interceptor pipeline should be removed if initializtion fails",
|
||||||
|
pipelines.get(id) == null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests the registration of multiple application master serially one at a
|
* Tests the registration of multiple application master serially one at a
|
||||||
* time.
|
* time.
|
||||||
|
|
Loading…
Reference in New Issue