YARN-6704. Add support for work preserving NM restart when FederationInterceptor is enabled in AMRMProxyService. (Botong Huang via Subru).

(cherry picked from commit 670e8d4ec7)
(cherry picked from commit 850bd0ed7c)
This commit is contained in:
Subru Krishnan 2017-12-08 15:39:18 -08:00
parent 4a064dd644
commit 39d0fdf1b2
6 changed files with 387 additions and 32 deletions

View File

@ -105,6 +105,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@ -616,7 +617,20 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
validateRunning();
return GetContainersResponse.newInstance(null);
ApplicationAttemptId attemptId = request.getApplicationAttemptId();
List<ContainerReport> containers = new ArrayList<>();
synchronized (applicationContainerIdMap) {
// Return the list of running containers that were being tracked for this
// application
Assert.assertTrue("The application id is NOT registered: " + attemptId,
applicationContainerIdMap.containsKey(attemptId));
List<ContainerId> ids = applicationContainerIdMap.get(attemptId);
for (ContainerId c : ids) {
containers.add(ContainerReport.newInstance(c, null, null, null, 0, 0,
null, null, 0, null, null));
}
}
return GetContainersResponse.newInstance(containers);
}
@Override

View File

@ -128,11 +128,8 @@ public class AMRMProxyService extends CompositeService implements
new AMRMProxyTokenSecretManager(this.nmContext.getNMStateStore());
this.secretManager.init(conf);
// Both second app attempt and NM restart within Federation need registry
if (conf.getBoolean(YarnConfiguration.AMRM_PROXY_HA_ENABLED,
YarnConfiguration.DEFAULT_AMRM_PROXY_HA_ENABLED)
|| conf.getBoolean(YarnConfiguration.NM_RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED)) {
YarnConfiguration.DEFAULT_AMRM_PROXY_HA_ENABLED)) {
this.registry = FederationStateStoreFacade.createInstance(conf,
YarnConfiguration.YARN_REGISTRY_CLASS,
YarnConfiguration.DEFAULT_YARN_REGISTRY_CLASS,

View File

@ -37,16 +37,23 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.PreemptionContract;
@ -59,6 +66,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
@ -90,6 +99,22 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
private static final Logger LOG =
LoggerFactory.getLogger(FederationInterceptor.class);
public static final String NMSS_CLASS_PREFIX = "FederationInterceptor/";
public static final String NMSS_REG_REQUEST_KEY =
NMSS_CLASS_PREFIX + "registerRequest";
public static final String NMSS_REG_RESPONSE_KEY =
NMSS_CLASS_PREFIX + "registerResponse";
/*
* When AMRMProxy HA is enabled, secondary AMRMTokens will be stored in Yarn
* Registry. Otherwise if NM recovery is enabled, the UAM token are store in
* local NMSS instead under this directory name.
*/
public static final String NMSS_SECONDARY_SC_PREFIX =
NMSS_CLASS_PREFIX + "secondarySC/";
public static final String STRING_TO_BYTE_FORMAT = "UTF-8";
/**
* The home sub-cluster is the sub-cluster where the AM container is running
* in.
@ -187,14 +212,20 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
} catch (Exception ex) {
throw new YarnRuntimeException(ex);
}
// Add all app tokens for Yarn Registry access
if (this.registryClient != null && appContext.getCredentials() != null) {
this.appOwner.addCredentials(appContext.getCredentials());
if (appContext.getRegistryClient() != null) {
this.registryClient = new FederationRegistryClient(conf,
appContext.getRegistryClient(), this.appOwner);
// Add all app tokens for Yarn Registry access
if (appContext.getCredentials() != null) {
this.appOwner.addCredentials(appContext.getCredentials());
}
}
this.homeSubClusterId =
SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
this.homeRM = createHomeRMProxy(appContext);
this.homeRM = createHomeRMProxy(appContext, ApplicationMasterProtocol.class,
this.appOwner);
this.federationFacade = FederationStateStoreFacade.getInstance();
this.subClusterResolver = this.federationFacade.getSubClusterResolver();
@ -204,11 +235,137 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
this.uamPool.init(conf);
this.uamPool.start();
}
if (appContext.getRegistryClient() != null) {
this.registryClient = new FederationRegistryClient(conf,
appContext.getRegistryClient(), this.appOwner);
@Override
public void recover(Map<String, byte[]> recoveredDataMap) {
super.recover(recoveredDataMap);
LOG.info("Recovering data for FederationInterceptor");
if (recoveredDataMap == null) {
return;
}
ApplicationAttemptId attemptId =
getApplicationContext().getApplicationAttemptId();
try {
if (recoveredDataMap.containsKey(NMSS_REG_REQUEST_KEY)) {
RegisterApplicationMasterRequestProto pb =
RegisterApplicationMasterRequestProto
.parseFrom(recoveredDataMap.get(NMSS_REG_REQUEST_KEY));
this.amRegistrationRequest =
new RegisterApplicationMasterRequestPBImpl(pb);
LOG.info("amRegistrationRequest recovered for {}", attemptId);
}
if (recoveredDataMap.containsKey(NMSS_REG_RESPONSE_KEY)) {
RegisterApplicationMasterResponseProto pb =
RegisterApplicationMasterResponseProto
.parseFrom(recoveredDataMap.get(NMSS_REG_RESPONSE_KEY));
this.amRegistrationResponse =
new RegisterApplicationMasterResponsePBImpl(pb);
LOG.info("amRegistrationResponse recovered for {}", attemptId);
}
// Recover UAM amrmTokens from registry or NMSS
Map<String, Token<AMRMTokenIdentifier>> uamMap;
if (this.registryClient != null) {
uamMap = this.registryClient
.loadStateFromRegistry(attemptId.getApplicationId());
LOG.info("Found {} existing UAMs for application {} in Yarn Registry",
uamMap.size(), attemptId.getApplicationId());
} else {
uamMap = new HashMap<>();
for (Entry<String, byte[]> entry : recoveredDataMap.entrySet()) {
if (entry.getKey().startsWith(NMSS_SECONDARY_SC_PREFIX)) {
// entry for subClusterId -> UAM amrmToken
String scId =
entry.getKey().substring(NMSS_SECONDARY_SC_PREFIX.length());
Token<AMRMTokenIdentifier> amrmToken = new Token<>();
amrmToken.decodeFromUrlString(
new String(entry.getValue(), STRING_TO_BYTE_FORMAT));
uamMap.put(scId, amrmToken);
LOG.debug("Recovered UAM in " + scId + " from NMSS");
}
}
LOG.info("Found {} existing UAMs for application {} in NMStateStore",
uamMap.size(), attemptId.getApplicationId());
}
// Re-attach the UAMs
int containers = 0;
for (Map.Entry<String, Token<AMRMTokenIdentifier>> entry : uamMap
.entrySet()) {
SubClusterId subClusterId = SubClusterId.newInstance(entry.getKey());
// Create a config loaded with federation on and subclusterId
// for each UAM
YarnConfiguration config = new YarnConfiguration(getConf());
FederationProxyProviderUtil.updateConfForFederation(config,
subClusterId.getId());
try {
this.uamPool.reAttachUAM(subClusterId.getId(), config,
attemptId.getApplicationId(),
this.amRegistrationResponse.getQueue(),
getApplicationContext().getUser(), this.homeSubClusterId.getId(),
entry.getValue());
RegisterApplicationMasterResponse response =
this.uamPool.registerApplicationMaster(subClusterId.getId(),
this.amRegistrationRequest);
// Running containers from secondary RMs
for (Container container : response
.getContainersFromPreviousAttempts()) {
containerIdToSubClusterIdMap.put(container.getId(), subClusterId);
containers++;
}
LOG.info("Recovered {} running containers from UAM in {}",
response.getContainersFromPreviousAttempts().size(),
subClusterId);
} catch (Exception e) {
LOG.error(
"Error reattaching UAM to " + subClusterId + " for " + attemptId,
e);
}
}
// Get the running containers from home RM, note that we will also get the
// AM container itself from here. We don't need it, but no harm to put the
// map as well.
UserGroupInformation appSubmitter = UserGroupInformation
.createRemoteUser(getApplicationContext().getUser());
ApplicationClientProtocol rmClient =
createHomeRMProxy(getApplicationContext(),
ApplicationClientProtocol.class, appSubmitter);
GetContainersResponse response =
rmClient.getContainers(GetContainersRequest.newInstance(attemptId));
for (ContainerReport container : response.getContainerList()) {
containerIdToSubClusterIdMap.put(container.getContainerId(),
this.homeSubClusterId);
containers++;
LOG.debug(" From home RM " + this.homeSubClusterId
+ " running container " + container.getContainerId());
}
LOG.info("{} running containers including AM recovered from home RM ",
response.getContainerList().size(), this.homeSubClusterId);
LOG.info(
"In all {} UAMs {} running containers including AM recovered for {}",
uamMap.size(), containers, attemptId);
if (this.amRegistrationResponse != null) {
// Initialize the AMRMProxyPolicy
String queue = this.amRegistrationResponse.getQueue();
this.policyInterpreter =
FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter,
getConf(), this.federationFacade, this.homeSubClusterId);
}
} catch (IOException | YarnException e) {
throw new YarnRuntimeException(e);
}
}
/**
@ -242,6 +399,19 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
// Save the registration request. This will be used for registering with
// secondary sub-clusters using UAMs, as well as re-register later
this.amRegistrationRequest = request;
if (getNMStateStore() != null) {
try {
RegisterApplicationMasterRequestPBImpl pb =
(RegisterApplicationMasterRequestPBImpl)
this.amRegistrationRequest;
getNMStateStore().storeAMRMProxyAppContextEntry(
getApplicationContext().getApplicationAttemptId(),
NMSS_REG_REQUEST_KEY, pb.getProto().toByteArray());
} catch (Exception e) {
LOG.error("Error storing AMRMProxy application context entry for "
+ getApplicationContext().getApplicationAttemptId(), e);
}
}
}
/*
@ -278,6 +448,20 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
getApplicationContext().getApplicationAttemptId().getApplicationId();
reAttachUAMAndMergeRegisterResponse(this.amRegistrationResponse, appId);
if (getNMStateStore() != null) {
try {
RegisterApplicationMasterResponsePBImpl pb =
(RegisterApplicationMasterResponsePBImpl)
this.amRegistrationResponse;
getNMStateStore().storeAMRMProxyAppContextEntry(
getApplicationContext().getApplicationAttemptId(),
NMSS_REG_RESPONSE_KEY, pb.getProto().toByteArray());
} catch (Exception e) {
LOG.error("Error storing AMRMProxy application context entry for "
+ getApplicationContext().getApplicationAttemptId(), e);
}
}
// the queue this application belongs will be used for getting
// AMRMProxy policy from state store.
String queue = this.amRegistrationResponse.getQueue();
@ -437,6 +621,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
if (uamResponse.getResponse() == null
|| !uamResponse.getResponse().getIsUnregistered()) {
failedToUnRegister = true;
} else if (getNMStateStore() != null) {
getNMStateStore().removeAMRMProxyAppContextEntry(
getApplicationContext().getApplicationAttemptId(),
NMSS_SECONDARY_SC_PREFIX + uamResponse.getSubClusterId());
}
} catch (Throwable e) {
failedToUnRegister = true;
@ -496,6 +684,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
}
@VisibleForTesting
protected FederationRegistryClient getRegistryClient() {
return this.registryClient;
}
/**
* Create the UAM pool manager for secondary sub-clsuters. For unit test to
* override.
@ -510,18 +703,20 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
/**
* Returns instance of the ApplicationMasterProtocol proxy class that is used
* to connect to the Home resource manager.
* Create a proxy instance that is used to connect to the Home resource
* manager.
*
* @param appContext AMRMProxyApplicationContext
* @param protocol the protocol class for the proxy
* @param user the ugi for the proxy
* @param <T> the type of the proxy
* @return the proxy created
*/
protected ApplicationMasterProtocol createHomeRMProxy(
AMRMProxyApplicationContext appContext) {
protected <T> T createHomeRMProxy(AMRMProxyApplicationContext appContext,
Class<T> protocol, UserGroupInformation user) {
try {
return FederationProxyProviderUtil.createRMProxy(appContext.getConf(),
ApplicationMasterProtocol.class, this.homeSubClusterId, this.appOwner,
appContext.getAMRMToken());
protocol, this.homeSubClusterId, user, appContext.getAMRMToken());
} catch (Exception ex) {
throw new YarnRuntimeException(ex);
}
@ -810,17 +1005,31 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
responses.add(response);
}
// Save the new AMRMToken for the UAM in registry if present
// Save the new AMRMToken for the UAM if present
if (response.getAMRMToken() != null) {
Token<AMRMTokenIdentifier> newToken = ConverterUtils
.convertFromYarn(response.getAMRMToken(), (Text) null);
// Update the token in registry
// Update the token in registry or NMSS
if (registryClient != null) {
registryClient
.writeAMRMTokenForUAM(
getApplicationContext().getApplicationAttemptId()
.getApplicationId(),
subClusterId.getId(), newToken);
} else if (getNMStateStore() != null) {
try {
getNMStateStore().storeAMRMProxyAppContextEntry(
getApplicationContext().getApplicationAttemptId(),
NMSS_SECONDARY_SC_PREFIX + subClusterId.getId(),
newToken.encodeToUrlString()
.getBytes(STRING_TO_BYTE_FORMAT));
} catch (IOException e) {
LOG.error(
"Error storing UAM token as AMRMProxy "
+ "context entry in NMSS for "
+ getApplicationContext().getApplicationAttemptId(),
e);
}
}
}
@ -925,12 +1134,20 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
successfulRegistrations.put(uamResponse.getSubClusterId(),
uamResponse.getResponse());
// Save the UAM token in registry or NMSS
if (registryClient != null) {
registryClient.writeAMRMTokenForUAM(
getApplicationContext().getApplicationAttemptId()
.getApplicationId(),
uamResponse.getSubClusterId().getId(),
uamResponse.getUamToken());
} else if (getNMStateStore() != null) {
getNMStateStore().storeAMRMProxyAppContextEntry(
getApplicationContext().getApplicationAttemptId(),
NMSS_SECONDARY_SC_PREFIX
+ uamResponse.getSubClusterId().getId(),
uamResponse.getUamToken().encodeToUrlString()
.getBytes(STRING_TO_BYTE_FORMAT));
}
}
} catch (Exception e) {
@ -952,11 +1169,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
private AllocateResponse mergeAllocateResponses(
AllocateResponse homeResponse) {
// Timing issue, we need to remove the completed and then save the new ones.
if (LOG.isDebugEnabled()) {
LOG.debug("Remove containers: "
+ homeResponse.getCompletedContainersStatuses());
LOG.debug("Adding containers: " + homeResponse.getAllocatedContainers());
}
removeFinishedContainersFromCache(
homeResponse.getCompletedContainersStatuses());
cacheAllocatedContainers(homeResponse.getAllocatedContainers(),
@ -989,6 +1201,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
private void removeFinishedContainersFromCache(
List<ContainerStatus> finishedContainers) {
for (ContainerStatus container : finishedContainers) {
LOG.debug("Completed container {}", container);
if (containerIdToSubClusterIdMap
.containsKey(container.getContainerId())) {
containerIdToSubClusterIdMap.remove(container.getContainerId());
@ -1146,12 +1359,22 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
private void cacheAllocatedContainers(List<Container> containers,
SubClusterId subClusterId) {
for (Container container : containers) {
LOG.debug("Adding container {}", container);
if (containerIdToSubClusterIdMap.containsKey(container.getId())) {
SubClusterId existingSubClusterId =
containerIdToSubClusterIdMap.get(container.getId());
if (existingSubClusterId.equals(subClusterId)) {
// When RM fails over, the new RM master might send out the same
// container allocation more than once. Just move on in this case.
/*
* When RM fails over, the new RM master might send out the same
* container allocation more than once.
*
* It is also possible because of a recent NM restart with NM recovery
* enabled. We recover running containers from RM. But RM might not
* notified AM of some of these containers yet. When RM dose notify,
* we will already have these containers in the map.
*
* Either case, just warn and move on.
*/
LOG.warn(
"Duplicate containerID: {} found in the allocated containers"
+ " from same sub-cluster: {}, so ignoring.",
@ -1226,7 +1449,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
*/
private boolean warnIfNotExists(ContainerId containerId, String actionName) {
if (!this.containerIdToSubClusterIdMap.containsKey(containerId)) {
LOG.error("AM is trying to {} a container {} that does not exist. ",
LOG.error(
"AM is trying to {} a container {} that does not exist. Might happen "
+ "shortly after NM restart when NM recovery is enabled",
actionName, containerId.toString());
return false;
}

View File

@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
@ -180,6 +181,20 @@ public abstract class BaseAMRMProxyTest {
return new NMContext(null, null, null, null, stateStore, false, this.conf);
}
// A utility method for intercepter recover unit test
protected Map<String, byte[]> recoverDataMapForAppAttempt(
NMStateStoreService nmStateStore, ApplicationAttemptId attemptId)
throws IOException {
RecoveredAMRMProxyState state = nmStateStore.loadAMRMProxyState();
for (Map.Entry<ApplicationAttemptId, Map<String, byte[]>> entry : state
.getAppContexts().entrySet()) {
if (entry.getKey().equals(attemptId)) {
return entry.getValue();
}
}
return null;
}
protected List<ContainerId> getCompletedContainerIds(
List<ContainerStatus> containerStatus) {
List<ContainerId> ret = new ArrayList<>();

View File

@ -449,6 +449,104 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
}
}
@Test
public void testRecoverWithAMRMProxyHA() throws Exception {
testRecover(registry);
}
@Test
public void testRecoverWithoutAMRMProxyHA() throws Exception {
testRecover(null);
}
public void testRecover(final RegistryOperations registryObj) throws Exception {
ApplicationUserInfo userInfo = getApplicationUserInfo(testAppId);
userInfo.getUser().doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
interceptor = new TestableFederationInterceptor();
interceptor.init(new AMRMProxyApplicationContextImpl(nmContext,
getConf(), attemptId, "test-user", null, null, null, registryObj));
interceptor.cleanupRegistry();
// Register the application
RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
registerReq.setRpcPort(testAppId);
registerReq.setTrackingUrl("");
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse);
Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
// Allocate one batch of containers
registerSubCluster(SubClusterId.newInstance("SC-1"));
registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
int numberOfContainers = 3;
List<Container> containers =
getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
// Prepare for Federation Interceptor restart and recover
Map<String, byte[]> recoveredDataMap =
recoverDataMapForAppAttempt(nmStateStore, attemptId);
String scEntry =
FederationInterceptor.NMSS_SECONDARY_SC_PREFIX + "SC-1";
if (registryObj == null) {
Assert.assertTrue(recoveredDataMap.containsKey(scEntry));
} else {
// When AMRMPRoxy HA is enabled, NMSS should not have the UAM token,
// it should be in Registry
Assert.assertFalse(recoveredDataMap.containsKey(scEntry));
}
// Preserve the mock RM instances
MockResourceManagerFacade homeRM = interceptor.getHomeRM();
ConcurrentHashMap<String, MockResourceManagerFacade> secondaries =
interceptor.getSecondaryRMs();
// Create a new intercepter instance and recover
interceptor = new TestableFederationInterceptor(homeRM, secondaries);
interceptor.init(new AMRMProxyApplicationContextImpl(nmContext,
getConf(), attemptId, "test-user", null, null, null, registryObj));
interceptor.recover(recoveredDataMap);
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
// Release all containers
releaseContainersAndAssert(containers);
// Finish the application
FinishApplicationMasterRequest finishReq =
Records.newRecord(FinishApplicationMasterRequest.class);
finishReq.setDiagnostics("");
finishReq.setTrackingUrl("");
finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
FinishApplicationMasterResponse finshResponse =
interceptor.finishApplicationMaster(finishReq);
Assert.assertNotNull(finshResponse);
Assert.assertEquals(true, finshResponse.getIsUnregistered());
// After the application succeeds, the registry/NMSS entry should be
// cleaned up
if (registryObj != null) {
Assert.assertEquals(0,
interceptor.getRegistryClient().getAllApplications().size());
} else {
recoveredDataMap =
recoverDataMapForAppAttempt(nmStateStore, attemptId);
Assert.assertFalse(recoveredDataMap.containsKey(scEntry));
}
return null;
}
});
}
@Test
public void testRequestInterceptorChainCreation() throws Exception {
RequestInterceptor root =
@ -636,6 +734,12 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
interceptor.finishApplicationMaster(finishReq);
Assert.assertNotNull(finshResponse);
Assert.assertEquals(true, finshResponse.getIsUnregistered());
// After the application succeeds, the registry entry should be deleted
if (interceptor.getRegistryClient() != null) {
Assert.assertEquals(0,
interceptor.getRegistryClient().getAllApplications().size());
}
return null;
}
});

View File

@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
@ -59,16 +58,17 @@ public class TestableFederationInterceptor extends FederationInterceptor {
return new TestableUnmanagedAMPoolManager(threadPool);
}
@SuppressWarnings("unchecked")
@Override
protected ApplicationMasterProtocol createHomeRMProxy(
AMRMProxyApplicationContext appContext) {
protected <T> T createHomeRMProxy(AMRMProxyApplicationContext appContext,
Class<T> protocol, UserGroupInformation user) {
synchronized (this) {
if (mockRm == null) {
mockRm = new MockResourceManagerFacade(
new YarnConfiguration(super.getConf()), 0);
}
}
return mockRm;
return (T) mockRm;
}
@SuppressWarnings("unchecked")