YARN-8696. [AMRMProxy] FederationInterceptor upgrade: home sub-cluster heartbeat async. Contributed by Botong Huang.

This commit is contained in:
Giovanni Matteo Fumarola 2018-09-24 11:40:07 -07:00
parent 60565976e1
commit 6937925838
12 changed files with 627 additions and 302 deletions

View File

@ -2835,6 +2835,11 @@ public class YarnConfiguration extends Configuration {
"org.apache.hadoop.yarn.server.federation.resolver."
+ "DefaultSubClusterResolverImpl";
// the maximum wait time for the first async heartbeat response
public static final String FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS =
FEDERATION_PREFIX + "amrmproxy.hb.maximum.wait.ms";
public static final long DEFAULT_FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS = 5000;
// AMRMProxy split-merge timeout for active sub-clusters. We will not route
// new asks to expired sub-clusters.
public static final String FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT =

View File

@ -101,6 +101,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
.add(YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER);
configurationPropsToSkipCompare
.add(YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS);
configurationPropsToSkipCompare
.add(YarnConfiguration.FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS);
configurationPropsToSkipCompare
.add(YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT);

View File

@ -44,6 +44,8 @@ public final class AMRMClientUtils {
private static final Logger LOG =
LoggerFactory.getLogger(AMRMClientUtils.class);
public static final int PRE_REGISTER_RESPONSE_ID = -1;
public static final String APP_ALREADY_REGISTERED_MESSAGE =
"Application Master is already registered : ";
@ -143,4 +145,10 @@ public final class AMRMClientUtils {
return -1;
}
}
public static int getNextResponseId(int responseId) {
// Loop between 0 to Integer.MAX_VALUE
return (responseId + 1) & Integer.MAX_VALUE;
}
}

View File

@ -47,6 +47,9 @@ public class AMHeartbeatRequestHandler extends Thread {
// Indication flag for the thread to keep running
private volatile boolean keepRunning;
// For unit test draining
private volatile boolean isThreadWaiting;
private Configuration conf;
private ApplicationId applicationId;
@ -61,6 +64,7 @@ public class AMHeartbeatRequestHandler extends Thread {
this.setUncaughtExceptionHandler(
new HeartBeatThreadUncaughtExceptionHandler());
this.keepRunning = true;
this.isThreadWaiting = false;
this.conf = conf;
this.applicationId = applicationId;
@ -82,12 +86,15 @@ public class AMHeartbeatRequestHandler extends Thread {
while (keepRunning) {
AsyncAllocateRequestInfo requestInfo;
try {
requestInfo = requestQueue.take();
this.isThreadWaiting = true;
requestInfo = this.requestQueue.take();
this.isThreadWaiting = false;
if (requestInfo == null) {
throw new YarnException(
"Null requestInfo taken from request queue");
}
if (!keepRunning) {
if (!this.keepRunning) {
break;
}
@ -98,7 +105,7 @@ public class AMHeartbeatRequestHandler extends Thread {
throw new YarnException("Null allocateRequest from requestInfo");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Sending Heartbeat to Unmanaged AM. AskList:"
LOG.debug("Sending Heartbeat to RM. AskList:"
+ ((request.getAskList() == null) ? " empty"
: request.getAskList().size()));
}
@ -181,6 +188,16 @@ public class AMHeartbeatRequestHandler extends Thread {
}
}
@VisibleForTesting
public void drainHeartbeatThread() {
while (!this.isThreadWaiting || this.requestQueue.size() > 0) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
}
}
@VisibleForTesting
public int getRequestQueueSize() {
return this.requestQueue.size();

View File

@ -78,7 +78,7 @@ public class FederationRegistryClient {
*
* @return the list of known applications
*/
public List<String> getAllApplications() {
public synchronized List<String> getAllApplications() {
// Suppress the exception here because it is valid that the entry does not
// exist
List<String> applications = null;
@ -99,7 +99,7 @@ public class FederationRegistryClient {
* For testing, delete all application records in registry.
*/
@VisibleForTesting
public void cleanAllApplications() {
public synchronized void cleanAllApplications() {
try {
removeKeyRegistry(this.registry, this.user, getRegistryKey(null, null),
true, false);
@ -115,7 +115,7 @@ public class FederationRegistryClient {
* @param token the UAM of the application
* @return whether the amrmToken is added or updated to a new value
*/
public boolean writeAMRMTokenForUAM(ApplicationId appId,
public synchronized boolean writeAMRMTokenForUAM(ApplicationId appId,
String subClusterId, Token<AMRMTokenIdentifier> token) {
Map<String, Token<AMRMTokenIdentifier>> subClusterTokenMap =
this.appSubClusterTokenMap.get(appId);
@ -154,7 +154,7 @@ public class FederationRegistryClient {
* @param appId application id
* @return the sub-cluster to UAM token mapping
*/
public Map<String, Token<AMRMTokenIdentifier>>
public synchronized Map<String, Token<AMRMTokenIdentifier>>
loadStateFromRegistry(ApplicationId appId) {
Map<String, Token<AMRMTokenIdentifier>> retMap = new HashMap<>();
// Suppress the exception here because it is valid that the entry does not
@ -203,7 +203,7 @@ public class FederationRegistryClient {
*
* @param appId application id
*/
public void removeAppFromRegistry(ApplicationId appId) {
public synchronized void removeAppFromRegistry(ApplicationId appId) {
Map<String, Token<AMRMTokenIdentifier>> subClusterTokenMap =
this.appSubClusterTokenMap.get(appId);
LOG.info("Removing all registry entries for {}", appId);

View File

@ -406,4 +406,19 @@ public class UnmanagedAMPoolManager extends AbstractService {
return this.unmanagedAppMasterMap.get(uamId).getAMRMClientRelayer();
}
@VisibleForTesting
public int getRequestQueueSize(String uamId) throws YarnException {
if (!this.unmanagedAppMasterMap.containsKey(uamId)) {
throw new YarnException("UAM " + uamId + " does not exist");
}
return this.unmanagedAppMasterMap.get(uamId).getRequestQueueSize();
}
@VisibleForTesting
public void drainUAMHeartbeats() {
for (UnmanagedApplicationManager uam : this.unmanagedAppMasterMap
.values()) {
uam.drainHeartbeatThread();
}
}
}

View File

@ -225,6 +225,10 @@ public class UnmanagedApplicationManager {
LOG.debug("RegisterUAM returned existing NM token for node "
+ nmToken.getNodeId());
}
LOG.info(
"RegisterUAM returned {} existing running container and {} NM tokens",
response.getContainersFromPreviousAttempts().size(),
response.getNMTokensFromPreviousAttempts().size());
// Only when register succeed that we start the heartbeat thread
this.heartbeatHandler.setDaemon(true);
@ -516,4 +520,14 @@ public class UnmanagedApplicationManager {
public int getRequestQueueSize() {
return this.heartbeatHandler.getRequestQueueSize();
}
@VisibleForTesting
protected void setHandlerThread(AMHeartbeatRequestHandler thread) {
this.heartbeatHandler = thread;
}
@VisibleForTesting
protected void drainHeartbeatThread() {
this.heartbeatHandler.drainHeartbeatThread();
}
}

View File

@ -174,8 +174,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
private HashSet<ApplicationId> applicationMap = new HashSet<>();
private HashSet<ApplicationId> keepContainerOnUams = new HashSet<>();
private HashMap<ApplicationAttemptId, List<ContainerId>>
applicationContainerIdMap = new HashMap<>();
private HashMap<ApplicationId, List<ContainerId>> applicationContainerIdMap =
new HashMap<>();
private int rmId;
private AtomicInteger containerIndex = new AtomicInteger(0);
private Configuration conf;
private int subClusterId;
@ -188,6 +189,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
private boolean shouldReRegisterNext = false;
private boolean shouldWaitForSyncNextAllocate = false;
// For unit test synchronization
private static Object syncObj = new Object();
@ -203,6 +206,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
public MockResourceManagerFacade(Configuration conf, int startContainerIndex,
int subClusterId, boolean isRunning) {
this.conf = conf;
this.rmId = startContainerIndex;
this.containerIndex.set(startContainerIndex);
this.subClusterId = subClusterId;
this.isRunning = isRunning;
@ -244,17 +248,17 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
validateRunning();
ApplicationAttemptId attemptId = getAppIdentifier();
LOG.info("Registering application attempt: " + attemptId);
ApplicationId appId = attemptId.getApplicationId();
List<Container> containersFromPreviousAttempt = null;
synchronized (applicationContainerIdMap) {
if (applicationContainerIdMap.containsKey(attemptId)) {
if (keepContainerOnUams.contains(attemptId.getApplicationId())) {
if (applicationContainerIdMap.containsKey(appId)) {
if (keepContainerOnUams.contains(appId)) {
// For UAM with the keepContainersFromPreviousAttempt flag, return all
// running containers
containersFromPreviousAttempt = new ArrayList<>();
for (ContainerId containerId : applicationContainerIdMap
.get(attemptId)) {
for (ContainerId containerId : applicationContainerIdMap.get(appId)) {
containersFromPreviousAttempt.add(Container.newInstance(containerId,
null, null, null, null, null));
}
@ -264,7 +268,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
}
} else {
// Keep track of the containers that are returned to this application
applicationContainerIdMap.put(attemptId, new ArrayList<ContainerId>());
applicationContainerIdMap.put(appId, new ArrayList<ContainerId>());
}
}
@ -299,6 +303,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
ApplicationAttemptId attemptId = getAppIdentifier();
LOG.info("Finishing application attempt: " + attemptId);
ApplicationId appId = attemptId.getApplicationId();
if (shouldReRegisterNext) {
String message = "AM is not registered, should re-register.";
@ -309,8 +314,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
synchronized (applicationContainerIdMap) {
// Remove the containers that were being tracked for this application
Assert.assertTrue("The application id is NOT registered: " + attemptId,
applicationContainerIdMap.containsKey(attemptId));
applicationContainerIdMap.remove(attemptId);
applicationContainerIdMap.containsKey(appId));
applicationContainerIdMap.remove(appId);
}
return FinishApplicationMasterResponse.newInstance(
@ -335,6 +340,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
ApplicationAttemptId attemptId = getAppIdentifier();
LOG.info("Allocate from application attempt: " + attemptId);
ApplicationId appId = attemptId.getApplicationId();
if (shouldReRegisterNext) {
String message = "AM is not registered, should re-register.";
@ -342,6 +348,21 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
throw new ApplicationMasterNotRegisteredException(message);
}
// Wait for signal for certain test cases
synchronized (syncObj) {
if (shouldWaitForSyncNextAllocate) {
shouldWaitForSyncNextAllocate = false;
LOG.info("Allocate call in RM start waiting");
try {
syncObj.wait();
LOG.info("Allocate call in RM wait finished");
} catch (InterruptedException e) {
LOG.info("Allocate call in RM wait interrupted", e);
}
}
}
ArrayList<Container> containerList = new ArrayList<Container>();
if (request.getAskList() != null) {
for (ResourceRequest rr : request.getAskList()) {
@ -366,9 +387,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
// will need it in future
Assert.assertTrue(
"The application id is Not registered before allocate(): "
+ attemptId,
applicationContainerIdMap.containsKey(attemptId));
List<ContainerId> ids = applicationContainerIdMap.get(attemptId);
+ appId,
applicationContainerIdMap.containsKey(appId));
List<ContainerId> ids = applicationContainerIdMap.get(appId);
ids.add(containerId);
}
}
@ -380,12 +401,10 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
&& request.getReleaseList().size() > 0) {
LOG.info("Releasing containers: " + request.getReleaseList().size());
synchronized (applicationContainerIdMap) {
Assert
.assertTrue(
"The application id is not registered before allocate(): "
+ attemptId,
applicationContainerIdMap.containsKey(attemptId));
List<ContainerId> ids = applicationContainerIdMap.get(attemptId);
Assert.assertTrue(
"The application id is not registered before allocate(): " + appId,
applicationContainerIdMap.containsKey(appId));
List<ContainerId> ids = applicationContainerIdMap.get(appId);
for (ContainerId id : request.getReleaseList()) {
boolean found = false;
@ -411,7 +430,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
+ " for application attempt: " + conf.get("AMRMTOKEN"));
// Always issue a new AMRMToken as if RM rolled master key
Token newAMRMToken = Token.newInstance(new byte[0], "", new byte[0], "");
Token newAMRMToken = Token.newInstance(new byte[0],
Integer.toString(this.rmId), new byte[0], "");
return AllocateResponse.newInstance(0, completedList, containerList,
new ArrayList<NodeReport>(), null, AMCommand.AM_RESYNC, 1, null,
@ -419,6 +439,12 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
new ArrayList<UpdatedContainer>(), null);
}
public void setWaitForSyncNextAllocate(boolean wait) {
synchronized (syncObj) {
shouldWaitForSyncNextAllocate = wait;
}
}
@Override
public GetApplicationReportResponse getApplicationReport(
GetApplicationReportRequest request) throws YarnException, IOException {
@ -610,14 +636,14 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
validateRunning();
ApplicationAttemptId attemptId = request.getApplicationAttemptId();
ApplicationId appId = request.getApplicationAttemptId().getApplicationId();
List<ContainerReport> containers = new ArrayList<>();
synchronized (applicationContainerIdMap) {
// Return the list of running containers that were being tracked for this
// application
Assert.assertTrue("The application id is NOT registered: " + attemptId,
applicationContainerIdMap.containsKey(attemptId));
List<ContainerId> ids = applicationContainerIdMap.get(attemptId);
Assert.assertTrue("The application id is NOT registered: " + appId,
applicationContainerIdMap.containsKey(appId));
List<ContainerId> ids = applicationContainerIdMap.get(appId);
for (ContainerId c : ids) {
containers.add(ContainerReport.newInstance(c, null, null, null, 0, 0,
null, null, 0, null, null));

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@ -62,14 +64,18 @@ import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.client.AMRMClientUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.AMHeartbeatRequestHandler;
import org.apache.hadoop.yarn.server.AMRMClientRelayer;
import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
@ -80,9 +86,9 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager;
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import org.apache.hadoop.yarn.util.AsyncCallback;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -116,6 +122,17 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
NMSS_CLASS_PREFIX + "secondarySC/";
public static final String STRING_TO_BYTE_FORMAT = "UTF-8";
private static final RecordFactory RECORD_FACTORY =
RecordFactoryProvider.getRecordFactory(null);
/**
* From AM's perspective, FederationInterceptor behaves exactly the same as
* YarnRM (ApplicationMasterService). This is to remember the last heart beat
* response, used to handle duplicate heart beat and responseId from AM.
*/
private AllocateResponse lastAllocateResponse;
private final Object lastAllocateResponseLock = new Object();
private ApplicationAttemptId attemptId;
/**
@ -124,7 +141,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
*/
private AMRMClientRelayer homeRMRelayer;
private SubClusterId homeSubClusterId;
private volatile int lastHomeResponseId;
private AMHeartbeatRequestHandler homeHeartbeartHandler;
/**
* UAM pool for secondary sub-clusters (ones other than home sub-cluster),
@ -146,7 +163,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
/**
* Stores the AllocateResponses that are received asynchronously from all the
* sub-cluster resource managers except the home RM.
* sub-cluster resource managers, including home RM.
*/
private Map<SubClusterId, List<AllocateResponse>> asyncResponseSink;
@ -194,14 +211,13 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
/** The policy used to split requests among sub-clusters. */
private FederationAMRMProxyPolicy policyInterpreter;
/**
* The proxy ugi used to talk to home RM, loaded with the up-to-date AMRMToken
* issued by home RM.
*/
private UserGroupInformation appOwner;
private FederationRegistryClient registryClient;
// the maximum wait time for the first async heart beat response
private long heartbeatMaxWaitTimeMs;
private MonotonicClock clock = new MonotonicClock();
/**
* Creates an instance of the FederationInterceptor class.
*/
@ -213,7 +229,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
this.secondaryRelayers = new ConcurrentHashMap<>();
this.amRegistrationRequest = null;
this.amRegistrationResponse = null;
this.lastHomeResponseId = Integer.MAX_VALUE;
this.justRecovered = false;
}
@ -233,8 +248,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
setConf(conf);
}
// The proxy ugi used to talk to home RM as well as Yarn Registry, loaded
// with the up-to-date AMRMToken issued by home RM.
UserGroupInformation appOwner;
try {
this.appOwner = UserGroupInformation.createProxyUser(appContext.getUser(),
appOwner = UserGroupInformation.createProxyUser(appContext.getUser(),
UserGroupInformation.getCurrentUser());
} catch (Exception ex) {
throw new YarnRuntimeException(ex);
@ -242,10 +260,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
if (appContext.getRegistryClient() != null) {
this.registryClient = new FederationRegistryClient(conf,
appContext.getRegistryClient(), this.appOwner);
appContext.getRegistryClient(), appOwner);
// Add all app tokens for Yarn Registry access
if (appContext.getCredentials() != null) {
this.appOwner.addCredentials(appContext.getCredentials());
appOwner.addCredentials(appContext.getCredentials());
}
}
@ -254,9 +272,21 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
this.homeSubClusterId =
SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
this.homeRMRelayer = new AMRMClientRelayer(createHomeRMProxy(appContext,
ApplicationMasterProtocol.class, this.appOwner), appId,
ApplicationMasterProtocol.class, appOwner), appId,
this.homeSubClusterId.toString());
this.homeHeartbeartHandler = createHomeHeartbeartHandler(conf, appId);
this.homeHeartbeartHandler.setAMRMClientRelayer(this.homeRMRelayer);
this.homeHeartbeartHandler.setUGI(appOwner);
this.homeHeartbeartHandler.setDaemon(true);
this.homeHeartbeartHandler.start();
// set lastResponseId to -1 before application master registers
this.lastAllocateResponse =
RECORD_FACTORY.newRecordInstance(AllocateResponse.class);
this.lastAllocateResponse
.setResponseId(AMRMClientUtils.PRE_REGISTER_RESPONSE_ID);
this.federationFacade = FederationStateStoreFacade.getInstance();
this.subClusterResolver = this.federationFacade.getSubClusterResolver();
@ -265,6 +295,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
this.uamPool.init(conf);
this.uamPool.start();
this.heartbeatMaxWaitTimeMs =
conf.getLong(YarnConfiguration.FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS,
YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS);
}
@Override
@ -272,6 +306,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
super.recover(recoveredDataMap);
LOG.info("Recovering data for FederationInterceptor for {}",
this.attemptId);
this.justRecovered = true;
if (recoveredDataMap == null) {
return;
}
@ -294,9 +330,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
this.amRegistrationResponse =
new RegisterApplicationMasterResponsePBImpl(pb);
LOG.info("amRegistrationResponse recovered for {}", this.attemptId);
// Trigger re-register and full pending re-send only if we have a
// saved register response. This should always be true though.
this.justRecovered = true;
}
// Recover UAM amrmTokens from registry or NMSS
@ -355,6 +388,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
.getContainersFromPreviousAttempts()) {
containerIdToSubClusterIdMap.put(container.getId(), subClusterId);
containers++;
LOG.debug(" From subcluster " + subClusterId
+ " running container " + container.getId());
}
LOG.info("Recovered {} running containers from UAM in {}",
response.getContainersFromPreviousAttempts().size(),
@ -384,7 +419,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
LOG.debug(" From home RM " + this.homeSubClusterId
+ " running container " + container.getContainerId());
}
LOG.info("{} running containers including AM recovered from home RM ",
LOG.info("{} running containers including AM recovered from home RM {}",
response.getContainerList().size(), this.homeSubClusterId);
LOG.info(
@ -411,8 +446,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
* so that when AM registers more than once, it returns the same register
* success response instead of throwing
* {@link InvalidApplicationMasterRequestException}. Furthermore, we present
* to AM as if we are the RM that never fails over. When actual RM fails over,
* we always re-register automatically.
* to AM as if we are the RM that never fails over (except when AMRMProxy
* restarts). When actual RM fails over, we always re-register automatically.
*
* We did this because FederationInterceptor can receive concurrent register
* requests from AM because of timeout between AM and AMRMProxy, which is
@ -425,6 +460,13 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
public synchronized RegisterApplicationMasterResponse
registerApplicationMaster(RegisterApplicationMasterRequest request)
throws YarnException, IOException {
// Reset the heartbeat responseId to zero upon register
synchronized (this.lastAllocateResponseLock) {
this.lastAllocateResponse.setResponseId(0);
}
this.justRecovered = false;
// If AM is calling with a different request, complain
if (this.amRegistrationRequest != null) {
if (!this.amRegistrationRequest.equals(request)) {
@ -524,34 +566,34 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
*/
@Override
public AllocateResponse allocate(AllocateRequest request)
throws YarnException {
throws YarnException, IOException {
Preconditions.checkArgument(this.policyInterpreter != null,
"Allocate should be called after registerApplicationMaster");
if (this.justRecovered && this.lastHomeResponseId == Integer.MAX_VALUE) {
// Save the responseId home RM is expecting
this.lastHomeResponseId = request.getResponseId();
if (this.justRecovered) {
throw new ApplicationMasterNotRegisteredException(
"AMRMProxy just restarted and recovered for " + this.attemptId
+ ". AM should re-register and full re-send pending requests.");
}
// Override responseId in the request in two cases:
//
// 1. After we just recovered after an NM restart and AM's responseId is
// reset due to the exception we generate. We need to override the
// responseId to the one homeRM expects.
//
// 2. After homeRM fail-over, the allocate response with reseted responseId
// might not be returned successfully back to AM because of RPC connection
// timeout between AM and AMRMProxy. In this case, we remember and reset the
// responseId for AM.
if (this.justRecovered
|| request.getResponseId() > this.lastHomeResponseId) {
LOG.warn("Setting allocate responseId for {} from {} to {}",
this.attemptId, request.getResponseId(), this.lastHomeResponseId);
request.setResponseId(this.lastHomeResponseId);
// Check responseId and handle duplicate heartbeat exactly same as RM
synchronized (this.lastAllocateResponseLock) {
LOG.info("Heartbeat from " + this.attemptId + " with responseId "
+ request.getResponseId() + " when we are expecting "
+ this.lastAllocateResponse.getResponseId());
// Normally request.getResponseId() == lastResponse.getResponseId()
if (AMRMClientUtils.getNextResponseId(
request.getResponseId()) == this.lastAllocateResponse
.getResponseId()) {
// heartbeat one step old, simply return lastReponse
return this.lastAllocateResponse;
} else if (request.getResponseId() != this.lastAllocateResponse
.getResponseId()) {
throw new InvalidApplicationMasterRequestException(
AMRMClientUtils.assembleInvalidResponseIdExceptionMessage(attemptId,
this.lastAllocateResponse.getResponseId(),
request.getResponseId()));
}
}
try {
@ -560,71 +602,55 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
Map<SubClusterId, AllocateRequest> requests =
splitAllocateRequest(request);
// Send the requests to the secondary sub-cluster resource managers.
// These secondary requests are send asynchronously and the responses will
// be collected and merged with the home response. In addition, it also
// return the newly registered Unmanaged AMs.
Registrations newRegistrations =
sendRequestsToSecondaryResourceManagers(requests);
/**
* Send the requests to the all sub-cluster resource managers. All
* requests are synchronously triggered but sent asynchronously. Later the
* responses will be collected and merged. In addition, it also returns
* the newly registered UAMs.
*/
Registrations newRegistrations = sendRequestsToResourceManagers(requests);
// Send the request to the home RM and get the response
AllocateRequest homeRequest = requests.get(this.homeSubClusterId);
LOG.info("{} heartbeating to home RM with responseId {}", this.attemptId,
homeRequest.getResponseId());
AllocateResponse homeResponse = this.homeRMRelayer.allocate(homeRequest);
// Reset the flag after the first successful homeRM allocate response,
// otherwise keep overriding the responseId of new allocate request
if (this.justRecovered) {
this.justRecovered = false;
// Wait for the first async response to arrive
long startTime = this.clock.getTime();
synchronized (this.asyncResponseSink) {
try {
this.asyncResponseSink.wait(this.heartbeatMaxWaitTimeMs);
} catch (InterruptedException e) {
}
}
long firstResponseTime = this.clock.getTime() - startTime;
// Notify policy of home response
// An extra brief wait for other async heart beats, so that most of their
// responses can make it back to AM in the same heart beat round trip.
try {
this.policyInterpreter.notifyOfResponse(this.homeSubClusterId,
homeResponse);
} catch (YarnException e) {
LOG.warn("notifyOfResponse for policy failed for home sub-cluster "
+ this.homeSubClusterId, e);
Thread.sleep(firstResponseTime);
} catch (InterruptedException e) {
}
// If the resource manager sent us a new token, add to the current user
if (homeResponse.getAMRMToken() != null) {
LOG.debug("Received new AMRMToken");
YarnServerSecurityUtils.updateAMRMToken(homeResponse.getAMRMToken(),
this.appOwner, getConf());
}
// Prepare the response to AM
AllocateResponse response =
RECORD_FACTORY.newRecordInstance(AllocateResponse.class);
// Merge the responses from home and secondary sub-cluster RMs
homeResponse = mergeAllocateResponses(homeResponse);
// Merge all responses from response sink
mergeAllocateResponses(response);
// Merge the containers and NMTokens from the new registrations into
// the homeResponse.
// the response
if (!isNullOrEmpty(newRegistrations.getSuccessfulRegistrations())) {
homeResponse = mergeRegistrationResponses(homeResponse,
mergeRegistrationResponses(response,
newRegistrations.getSuccessfulRegistrations());
}
LOG.info("{} heartbeat response from home RM with responseId {}",
this.attemptId, homeResponse.getResponseId());
// Update lastHomeResponseId in three cases:
// 1. The normal responseId increments
// 2. homeResponse.getResponseId() == 1. This happens when homeRM fails
// over, AMRMClientRelayer auto re-register and full re-send for homeRM.
// 3. lastHomeResponseId == MAX_INT. This is the initial case or
// responseId about to overflow and wrap around
if (homeResponse.getResponseId() == this.lastHomeResponseId + 1
|| homeResponse.getResponseId() == 1
|| this.lastHomeResponseId == Integer.MAX_VALUE) {
this.lastHomeResponseId = homeResponse.getResponseId();
// update the responseId and return the final response to AM
synchronized (this.lastAllocateResponseLock) {
response.setResponseId(AMRMClientUtils
.getNextResponseId(this.lastAllocateResponse.getResponseId()));
this.lastAllocateResponse = response;
}
// return the final response to the application master.
return homeResponse;
} catch (IOException ex) {
LOG.error("Exception encountered while processing heart beat", ex);
return response;
} catch (Throwable ex) {
LOG.error("Exception encountered while processing heart beat for "
+ this.attemptId, ex);
throw new YarnException(ex);
}
}
@ -696,6 +722,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
FinishApplicationMasterResponse homeResponse =
this.homeRMRelayer.finishApplicationMaster(request);
// Stop the home heartbeat thread
this.homeHeartbeartHandler.shutdown();
if (subClusterIds.size() > 0) {
// Wait for other sub-cluster resource managers to return the
// response and merge it with the home response
@ -758,10 +787,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
this.threadpool = null;
}
homeRMRelayer.shutdown();
for(AMRMClientRelayer relayer : secondaryRelayers.values()){
// Stop the home heartbeat thread
this.homeHeartbeartHandler.shutdown();
this.homeRMRelayer.shutdown();
for (AMRMClientRelayer relayer : this.secondaryRelayers.values()) {
relayer.shutdown();
}
super.shutdown();
}
@ -781,8 +814,13 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
@VisibleForTesting
protected int getLastHomeResponseId() {
return this.lastHomeResponseId;
protected ApplicationAttemptId getAttemptId() {
return this.attemptId;
}
@VisibleForTesting
protected AMHeartbeatRequestHandler getHomeHeartbeartHandler() {
return this.homeHeartbeartHandler;
}
/**
@ -798,6 +836,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
return new UnmanagedAMPoolManager(threadPool);
}
@VisibleForTesting
protected AMHeartbeatRequestHandler createHomeHeartbeartHandler(
Configuration conf, ApplicationId appId) {
return new AMHeartbeatRequestHandler(conf, appId);
}
/**
* Create a proxy instance that is used to connect to the Home resource
* manager.
@ -872,7 +916,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
+ "Reattaching in parallel", uamMap.size(), appId);
ExecutorCompletionService<RegisterApplicationMasterResponse>
completionService = new ExecutorCompletionService<>(threadpool);
completionService = new ExecutorCompletionService<>(this.threadpool);
for (Entry<String, Token<AMRMTokenIdentifier>> entry : uamMap.entrySet()) {
final SubClusterId subClusterId =
@ -1047,16 +1091,16 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
/**
* This methods sends the specified AllocateRequests to the appropriate
* sub-cluster resource managers.
* sub-cluster resource managers asynchronously.
*
* @param requests contains the heart beat requests to send to the resource
* manager keyed by the resource manager address
* manager keyed by the sub-cluster id
* @return the registration responses from the newly added sub-cluster
* resource managers
* @throws YarnException
* @throws IOException
*/
private Registrations sendRequestsToSecondaryResourceManagers(
private Registrations sendRequestsToResourceManagers(
Map<SubClusterId, AllocateRequest> requests)
throws YarnException, IOException {
@ -1065,32 +1109,29 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
Registrations registrations = registerWithNewSubClusters(requests.keySet());
// Now that all the registrations are done, send the allocation request
// to the sub-cluster RMs using the Unmanaged application masters
// asynchronously and don't wait for the response. The responses will
// arrive asynchronously and will be added to the response sink. These
// responses will be sent to the application master in some future heart
// beat response.
// to the sub-cluster RMs asynchronously and don't wait for the response.
// The responses will arrive asynchronously and will be added to the
// response sink, then merged and sent to the application master.
for (Entry<SubClusterId, AllocateRequest> entry : requests.entrySet()) {
final SubClusterId subClusterId = entry.getKey();
SubClusterId subClusterId = entry.getKey();
if (subClusterId.equals(this.homeSubClusterId)) {
// Skip the request for the home sub-cluster resource manager.
// It will be handled separately in the allocate() method
continue;
// Request for the home sub-cluster resource manager
this.homeHeartbeartHandler.allocateAsync(entry.getValue(),
new HeartbeatCallBack(this.homeSubClusterId, false));
} else {
if (!this.uamPool.hasUAMId(subClusterId.getId())) {
// TODO: This means that the registration for this sub-cluster RM
// failed. For now, we ignore the resource requests and continue
// but we need to fix this and handle this situation. One way would
// be to send the request to another RM by consulting the policy.
LOG.warn("Unmanaged AM registration not found for sub-cluster {}",
subClusterId);
continue;
}
this.uamPool.allocateAsync(subClusterId.getId(), entry.getValue(),
new HeartbeatCallBack(subClusterId, true));
}
if (!this.uamPool.hasUAMId(subClusterId.getId())) {
// TODO: This means that the registration for this sub-cluster RM
// failed. For now, we ignore the resource requests and continue
// but we need to fix this and handle this situation. One way would
// be to send the request to another RM by consulting the policy.
LOG.warn("Unmanaged AM registration not found for sub-cluster {}",
subClusterId);
continue;
}
this.uamPool.allocateAsync(subClusterId.getId(), entry.getValue(),
new HeartbeatCallBack(subClusterId));
}
return registrations;
@ -1123,7 +1164,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
this.amRegistrationRequest;
final AMRMProxyApplicationContext appContext = getApplicationContext();
ExecutorCompletionService<RegisterApplicationMasterResponseInfo>
completionService = new ExecutorCompletionService<>(threadpool);
completionService = new ExecutorCompletionService<>(this.threadpool);
for (final String subClusterId : newSubClusters) {
completionService
@ -1208,21 +1249,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
/**
* Merges the responses from other sub-clusters that we received
* asynchronously with the specified home cluster response and keeps track of
* the containers received from each sub-cluster resource managers.
* Merge the responses from all sub-clusters that we received asynchronously
* and keeps track of the containers received from each sub-cluster resource
* managers.
*/
private AllocateResponse mergeAllocateResponses(
AllocateResponse homeResponse) {
// Timing issue, we need to remove the completed and then save the new ones.
removeFinishedContainersFromCache(
homeResponse.getCompletedContainersStatuses());
cacheAllocatedContainers(homeResponse.getAllocatedContainers(),
this.homeSubClusterId);
private void mergeAllocateResponses(AllocateResponse mergedResponse) {
synchronized (this.asyncResponseSink) {
for (Entry<SubClusterId, List<AllocateResponse>> entry : asyncResponseSink
.entrySet()) {
for (Entry<SubClusterId, List<AllocateResponse>> entry :
this.asyncResponseSink.entrySet()) {
SubClusterId subClusterId = entry.getKey();
List<AllocateResponse> responses = entry.getValue();
if (responses.size() > 0) {
@ -1231,14 +1265,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
response.getCompletedContainersStatuses());
cacheAllocatedContainers(response.getAllocatedContainers(),
subClusterId);
mergeAllocateResponse(homeResponse, response, subClusterId);
mergeAllocateResponse(mergedResponse, response, subClusterId);
}
responses.clear();
}
}
}
return homeResponse;
}
/**
@ -1256,11 +1288,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
/**
* Helper method for merging the responses from the secondary sub cluster RMs
* with the home response to return to the AM.
* Helper method for merging the registration responses from the secondary sub
* cluster RMs into the allocate response to return to the AM.
*/
private AllocateResponse mergeRegistrationResponses(
AllocateResponse homeResponse,
private void mergeRegistrationResponses(AllocateResponse homeResponse,
Map<SubClusterId, RegisterApplicationMasterResponse> registrations) {
for (Entry<SubClusterId, RegisterApplicationMasterResponse> entry :
@ -1292,13 +1323,22 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
}
}
return homeResponse;
}
private void mergeAllocateResponse(AllocateResponse homeResponse,
AllocateResponse otherResponse, SubClusterId otherRMAddress) {
if (otherResponse.getAMRMToken() != null) {
// Propagate only the new amrmToken from home sub-cluster back to
// AMRMProxyService
if (otherRMAddress.equals(this.homeSubClusterId)) {
homeResponse.setAMRMToken(otherResponse.getAMRMToken());
} else {
throw new YarnRuntimeException(
"amrmToken from UAM " + otherRMAddress + " should be null here");
}
}
if (!isNullOrEmpty(otherResponse.getAllocatedContainers())) {
if (!isNullOrEmpty(homeResponse.getAllocatedContainers())) {
homeResponse.getAllocatedContainers()
@ -1406,9 +1446,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
SubClusterId subClusterId) {
for (Container container : containers) {
LOG.debug("Adding container {}", container);
if (containerIdToSubClusterIdMap.containsKey(container.getId())) {
if (this.containerIdToSubClusterIdMap.containsKey(container.getId())) {
SubClusterId existingSubClusterId =
containerIdToSubClusterIdMap.get(container.getId());
this.containerIdToSubClusterIdMap.get(container.getId());
if (existingSubClusterId.equals(subClusterId)) {
/*
* When RM fails over, the new RM master might send out the same
@ -1441,7 +1482,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
}
containerIdToSubClusterIdMap.put(container.getId(), subClusterId);
this.containerIdToSubClusterIdMap.put(container.getId(), subClusterId);
}
}
@ -1463,7 +1504,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
newRequest.setProgress(originalAMRequest.getProgress());
requestMap.put(subClusterId, newRequest);
}
return newRequest;
}
@ -1472,7 +1512,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
*/
private static AllocateRequest createAllocateRequest() {
AllocateRequest request =
AllocateRequest.newInstance(0, 0, null, null, null);
RECORD_FACTORY.newRecordInstance(AllocateRequest.class);
request.setAskList(new ArrayList<ResourceRequest>());
request.setReleaseList(new ArrayList<ContainerId>());
ResourceBlacklistRequest blackList =
@ -1525,6 +1565,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
return this.uamPool.getAllUAMIds().size();
}
@VisibleForTesting
protected UnmanagedAMPoolManager getUnmanagedAMPool() {
return this.uamPool;
}
@VisibleForTesting
public Map<SubClusterId, List<AllocateResponse>> getAsyncResponseSink() {
return this.asyncResponseSink;
@ -1535,9 +1580,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
*/
private class HeartbeatCallBack implements AsyncCallback<AllocateResponse> {
private SubClusterId subClusterId;
private boolean isUAM;
HeartbeatCallBack(SubClusterId subClusterId) {
HeartbeatCallBack(SubClusterId subClusterId, boolean isUAM) {
this.subClusterId = subClusterId;
this.isUAM = isUAM;
}
@Override
@ -1551,16 +1598,33 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
asyncResponseSink.put(subClusterId, responses);
}
responses.add(response);
// Notify main thread about the response arrival
asyncResponseSink.notifyAll();
}
// Save the new AMRMToken for the UAM if present
if (response.getAMRMToken() != null) {
if (this.isUAM && response.getAMRMToken() != null) {
Token<AMRMTokenIdentifier> newToken = ConverterUtils
.convertFromYarn(response.getAMRMToken(), (Text) null);
// Do not further propagate the new amrmToken for UAM
response.setAMRMToken(null);
// Update the token in registry or NMSS
if (registryClient != null) {
registryClient.writeAMRMTokenForUAM(attemptId.getApplicationId(),
subClusterId.getId(), newToken);
if (registryClient.writeAMRMTokenForUAM(attemptId.getApplicationId(),
subClusterId.getId(), newToken)) {
try {
AMRMTokenIdentifier identifier = new AMRMTokenIdentifier();
identifier.readFields(new DataInputStream(
new ByteArrayInputStream(newToken.getIdentifier())));
LOG.info(
"Received new UAM amrmToken with keyId {} and "
+ "service {} from {} for {}, written to Registry",
identifier.getKeyId(), newToken.getService(), subClusterId,
attemptId);
} catch (IOException e) {
}
}
} else if (getNMStateStore() != null) {
try {
getNMStateStore().storeAMRMProxyAppContextEntry(attemptId,
@ -1573,11 +1637,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
}
// Notify policy of secondary sub-cluster responses
// Notify policy of allocate response
try {
policyInterpreter.notifyOfResponse(subClusterId, response);
} catch (YarnException e) {
LOG.warn("notifyOfResponse for policy failed for home sub-cluster "
LOG.warn("notifyOfResponse for policy failed for sub-cluster "
+ subClusterId, e);
}
}

View File

@ -33,6 +33,7 @@ import java.util.concurrent.Executors;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@ -49,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.UpdateContainerError;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -95,6 +97,8 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
private int testAppId;
private ApplicationAttemptId attemptId;
private volatile int lastResponseId;
@Override
public void setUp() throws IOException {
super.setUp();
@ -120,6 +124,8 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, getConf(),
attemptId, "test-user", null, null, null, registry));
interceptor.cleanupRegistry();
lastResponseId = 0;
}
@Override
@ -174,8 +180,6 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
private List<Container> getContainersAndAssert(int numberOfResourceRequests,
int numberOfAllocationExcepted) throws Exception {
AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
allocateRequest.setResponseId(1);
List<Container> containers =
new ArrayList<Container>(numberOfResourceRequests);
List<ResourceRequest> askList =
@ -187,22 +191,31 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
allocateRequest.setAskList(askList);
allocateRequest.setResponseId(lastResponseId);
AllocateResponse allocateResponse = interceptor.allocate(allocateRequest);
Assert.assertNotNull("allocate() returned null response", allocateResponse);
checkAMRMToken(allocateResponse.getAMRMToken());
lastResponseId = allocateResponse.getResponseId();
containers.addAll(allocateResponse.getAllocatedContainers());
LOG.info("Number of allocated containers in the original request: "
+ Integer.toString(allocateResponse.getAllocatedContainers().size()));
// Make sure this request is picked up by all async heartbeat handlers
interceptor.drainAllAsyncQueue(false);
// Send max 10 heart beats to receive all the containers. If not, we will
// fail the test
int numHeartbeat = 0;
while (containers.size() < numberOfAllocationExcepted
&& numHeartbeat++ < 10) {
allocateResponse =
interceptor.allocate(Records.newRecord(AllocateRequest.class));
allocateRequest = Records.newRecord(AllocateRequest.class);
allocateRequest.setResponseId(lastResponseId);
allocateResponse = interceptor.allocate(allocateRequest);
Assert.assertNotNull("allocate() returned null response",
allocateResponse);
checkAMRMToken(allocateResponse.getAMRMToken());
lastResponseId = allocateResponse.getResponseId();
containers.addAll(allocateResponse.getAllocatedContainers());
@ -220,8 +233,6 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
throws Exception {
Assert.assertTrue(containers.size() > 0);
AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
allocateRequest.setResponseId(1);
List<ContainerId> relList = new ArrayList<ContainerId>(containers.size());
for (Container container : containers) {
relList.add(container.getId());
@ -229,8 +240,11 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
allocateRequest.setReleaseList(relList);
allocateRequest.setResponseId(lastResponseId);
AllocateResponse allocateResponse = interceptor.allocate(allocateRequest);
Assert.assertNotNull(allocateResponse);
checkAMRMToken(allocateResponse.getAMRMToken());
lastResponseId = allocateResponse.getResponseId();
// The release request will be split and handled by the corresponding UAM.
// The release containers returned by the mock resource managers will be
@ -244,14 +258,21 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
LOG.info("Number of containers received in the original request: "
+ Integer.toString(newlyFinished.size()));
// Make sure this request is picked up by all async heartbeat handlers
interceptor.drainAllAsyncQueue(false);
// Send max 10 heart beats to receive all the containers. If not, we will
// fail the test
int numHeartbeat = 0;
while (containersForReleasedContainerIds.size() < relList.size()
&& numHeartbeat++ < 10) {
allocateResponse =
interceptor.allocate(Records.newRecord(AllocateRequest.class));
allocateRequest = Records.newRecord(AllocateRequest.class);
allocateRequest.setResponseId(lastResponseId);
allocateResponse = interceptor.allocate(allocateRequest);
Assert.assertNotNull(allocateResponse);
checkAMRMToken(allocateResponse.getAMRMToken());
lastResponseId = allocateResponse.getResponseId();
newlyFinished = getCompletedContainerIds(
allocateResponse.getCompletedContainersStatuses());
containersForReleasedContainerIds.addAll(newlyFinished);
@ -267,65 +288,81 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
containersForReleasedContainerIds.size());
}
private void checkAMRMToken(Token amrmToken) {
if (amrmToken != null) {
// The token should be the one issued by home MockRM
Assert.assertTrue(amrmToken.getKind().equals(Integer.toString(0)));
}
}
@Test
public void testMultipleSubClusters() throws Exception {
UserGroupInformation ugi =
interceptor.getUGIWithToken(interceptor.getAttemptId());
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
// Register the application
RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
registerReq.setRpcPort(0);
registerReq.setTrackingUrl("");
// Register the application
RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
registerReq.setRpcPort(0);
registerReq.setTrackingUrl("");
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse);
lastResponseId = 0;
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse);
Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
// Allocate the first batch of containers, with sc1 and sc2 active
registerSubCluster(SubClusterId.newInstance("SC-1"));
registerSubCluster(SubClusterId.newInstance("SC-2"));
// Allocate the first batch of containers, with sc1 and sc2 active
registerSubCluster(SubClusterId.newInstance("SC-1"));
registerSubCluster(SubClusterId.newInstance("SC-2"));
int numberOfContainers = 3;
List<Container> containers =
getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize());
int numberOfContainers = 3;
List<Container> containers =
getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize());
// Allocate the second batch of containers, with sc1 and sc3 active
deRegisterSubCluster(SubClusterId.newInstance("SC-2"));
registerSubCluster(SubClusterId.newInstance("SC-3"));
// Allocate the second batch of containers, with sc1 and sc3 active
deRegisterSubCluster(SubClusterId.newInstance("SC-2"));
registerSubCluster(SubClusterId.newInstance("SC-3"));
numberOfContainers = 1;
containers.addAll(
getContainersAndAssert(numberOfContainers, numberOfContainers * 2));
Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize());
numberOfContainers = 1;
containers.addAll(
getContainersAndAssert(numberOfContainers, numberOfContainers * 2));
Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize());
// Allocate the third batch of containers with only in home sub-cluster
// active
deRegisterSubCluster(SubClusterId.newInstance("SC-1"));
deRegisterSubCluster(SubClusterId.newInstance("SC-3"));
registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
// Allocate the third batch of containers with only in home sub-cluster
// active
deRegisterSubCluster(SubClusterId.newInstance("SC-1"));
deRegisterSubCluster(SubClusterId.newInstance("SC-3"));
registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
numberOfContainers = 2;
containers.addAll(
getContainersAndAssert(numberOfContainers, numberOfContainers * 1));
Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize());
numberOfContainers = 2;
containers.addAll(
getContainersAndAssert(numberOfContainers, numberOfContainers * 1));
Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize());
// Release all containers
releaseContainersAndAssert(containers);
// Release all containers
releaseContainersAndAssert(containers);
// Finish the application
FinishApplicationMasterRequest finishReq =
Records.newRecord(FinishApplicationMasterRequest.class);
finishReq.setDiagnostics("");
finishReq.setTrackingUrl("");
finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
// Finish the application
FinishApplicationMasterRequest finishReq =
Records.newRecord(FinishApplicationMasterRequest.class);
finishReq.setDiagnostics("");
finishReq.setTrackingUrl("");
finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
FinishApplicationMasterResponse finshResponse =
interceptor.finishApplicationMaster(finishReq);
Assert.assertNotNull(finshResponse);
Assert.assertEquals(true, finshResponse.getIsUnregistered());
FinishApplicationMasterResponse finshResponse =
interceptor.finishApplicationMaster(finishReq);
Assert.assertNotNull(finshResponse);
Assert.assertEquals(true, finshResponse.getIsUnregistered());
return null;
}
});
}
/*
@ -333,49 +370,58 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
*/
@Test
public void testReregister() throws Exception {
UserGroupInformation ugi =
interceptor.getUGIWithToken(interceptor.getAttemptId());
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
// Register the application
RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
registerReq.setRpcPort(0);
registerReq.setTrackingUrl("");
// Register the application
RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
registerReq.setRpcPort(0);
registerReq.setTrackingUrl("");
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse);
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse);
lastResponseId = 0;
Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
// Allocate the first batch of containers
registerSubCluster(SubClusterId.newInstance("SC-1"));
registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
// Allocate the first batch of containers
registerSubCluster(SubClusterId.newInstance("SC-1"));
registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
interceptor.setShouldReRegisterNext();
interceptor.setShouldReRegisterNext();
int numberOfContainers = 3;
List<Container> containers =
getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
int numberOfContainers = 3;
List<Container> containers =
getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
interceptor.setShouldReRegisterNext();
interceptor.setShouldReRegisterNext();
// Release all containers
releaseContainersAndAssert(containers);
// Release all containers
releaseContainersAndAssert(containers);
interceptor.setShouldReRegisterNext();
interceptor.setShouldReRegisterNext();
// Finish the application
FinishApplicationMasterRequest finishReq =
Records.newRecord(FinishApplicationMasterRequest.class);
finishReq.setDiagnostics("");
finishReq.setTrackingUrl("");
finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
// Finish the application
FinishApplicationMasterRequest finishReq =
Records.newRecord(FinishApplicationMasterRequest.class);
finishReq.setDiagnostics("");
finishReq.setTrackingUrl("");
finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
FinishApplicationMasterResponse finshResponse =
interceptor.finishApplicationMaster(finishReq);
Assert.assertNotNull(finshResponse);
Assert.assertEquals(true, finshResponse.getIsUnregistered());
FinishApplicationMasterResponse finshResponse =
interceptor.finishApplicationMaster(finishReq);
Assert.assertNotNull(finshResponse);
Assert.assertEquals(true, finshResponse.getIsUnregistered());
return null;
}
});
}
/*
@ -442,6 +488,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
// Use port number 1001 to let mock RM block in the register call
response = interceptor.registerApplicationMaster(
RegisterApplicationMasterRequest.newInstance(null, 1001, null));
lastResponseId = 0;
} catch (Exception e) {
LOG.info("Register thread exception", e);
response = null;
@ -460,9 +507,11 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
testRecover(null);
}
public void testRecover(final RegistryOperations registryObj) throws Exception {
ApplicationUserInfo userInfo = getApplicationUserInfo(testAppId);
userInfo.getUser().doAs(new PrivilegedExceptionAction<Object>() {
protected void testRecover(final RegistryOperations registryObj)
throws Exception {
UserGroupInformation ugi =
interceptor.getUGIWithToken(interceptor.getAttemptId());
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
interceptor = new TestableFederationInterceptor();
@ -480,6 +529,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse);
lastResponseId = 0;
Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
@ -492,6 +542,9 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
// Make sure all async hb threads are done
interceptor.drainAllAsyncQueue(true);
// Prepare for Federation Interceptor restart and recover
Map<String, byte[]> recoveredDataMap =
recoverDataMapForAppAttempt(nmStateStore, attemptId);
@ -517,22 +570,21 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
interceptor.recover(recoveredDataMap);
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
Assert.assertEquals(Integer.MAX_VALUE,
interceptor.getLastHomeResponseId());
// The first allocate call expects a fail-over exception and re-register
int responseId = 10;
AllocateRequest allocateRequest =
Records.newRecord(AllocateRequest.class);
allocateRequest.setResponseId(responseId);
try {
interceptor.allocate(allocateRequest);
AllocateRequest allocateRequest =
Records.newRecord(AllocateRequest.class);
allocateRequest.setResponseId(lastResponseId);
AllocateResponse allocateResponse =
interceptor.allocate(allocateRequest);
lastResponseId = allocateResponse.getResponseId();
Assert.fail("Expecting an ApplicationMasterNotRegisteredException "
+ " after FederationInterceptor restarts and recovers");
} catch (ApplicationMasterNotRegisteredException e) {
}
interceptor.registerApplicationMaster(registerReq);
Assert.assertEquals(responseId, interceptor.getLastHomeResponseId());
lastResponseId = 0;
// Release all containers
releaseContainersAndAssert(containers);
@ -614,6 +666,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse);
lastResponseId = 0;
}
}
@ -629,6 +682,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse);
lastResponseId = 0;
// Register the application second time with a different request obj
registerReq = Records.newRecord(RegisterApplicationMasterRequest.class);
@ -637,6 +691,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
registerReq.setTrackingUrl("different");
try {
registerResponse = interceptor.registerApplicationMaster(registerReq);
lastResponseId = 0;
Assert.fail("Should throw if a different request obj is used");
} catch (YarnException e) {
}
@ -689,20 +744,22 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
@Test
public void testSecondAttempt() throws Exception {
ApplicationUserInfo userInfo = getApplicationUserInfo(testAppId);
userInfo.getUser().doAs(new PrivilegedExceptionAction<Object>() {
final RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
registerReq.setRpcPort(testAppId);
registerReq.setTrackingUrl("");
UserGroupInformation ugi =
interceptor.getUGIWithToken(interceptor.getAttemptId());
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
// Register the application
RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
registerReq.setRpcPort(testAppId);
registerReq.setTrackingUrl("");
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse);
lastResponseId = 0;
Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
@ -714,10 +771,13 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
List<Container> containers =
getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
for (Container c : containers) {
System.out.println(c.getId() + " ha");
LOG.info("Allocated container " + c.getId());
}
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
// Make sure all async hb threads are done
interceptor.drainAllAsyncQueue(true);
// Preserve the mock RM instances for secondaries
ConcurrentHashMap<String, MockResourceManagerFacade> secondaries =
interceptor.getSecondaryRMs();
@ -729,8 +789,20 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
interceptor = new TestableFederationInterceptor(null, secondaries);
interceptor.init(new AMRMProxyApplicationContextImpl(nmContext,
getConf(), attemptId, "test-user", null, null, null, registry));
registerResponse = interceptor.registerApplicationMaster(registerReq);
return null;
}
});
// Update the ugi with new attemptId
ugi = interceptor.getUGIWithToken(interceptor.getAttemptId());
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq);
lastResponseId = 0;
int numberOfContainers = 3;
// Should re-attach secondaries and get the three running containers
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
Assert.assertEquals(numberOfContainers,

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
@ -26,18 +27,26 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.AMHeartbeatRequestHandler;
import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager;
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Extends the FederationInterceptor and overrides methods to provide a testable
* implementation of FederationInterceptor.
*/
public class TestableFederationInterceptor extends FederationInterceptor {
public static final Logger LOG =
LoggerFactory.getLogger(TestableFederationInterceptor.class);
private ConcurrentHashMap<String, MockResourceManagerFacade>
secondaryResourceManagers = new ConcurrentHashMap<>();
private AtomicInteger runningIndex = new AtomicInteger(0);
@ -58,6 +67,12 @@ public class TestableFederationInterceptor extends FederationInterceptor {
return new TestableUnmanagedAMPoolManager(threadPool);
}
@Override
protected AMHeartbeatRequestHandler createHomeHeartbeartHandler(
Configuration conf, ApplicationId appId) {
return new TestableAMRequestHandlerThread(conf, appId);
}
@SuppressWarnings("unchecked")
@Override
protected <T> T createHomeRMProxy(AMRMProxyApplicationContext appContext,
@ -109,6 +124,71 @@ public class TestableFederationInterceptor extends FederationInterceptor {
return secondaryResourceManagers;
}
protected MockResourceManagerFacade getSecondaryRM(String scId) {
return secondaryResourceManagers.get(scId);
}
/**
* Drain all aysnc heartbeat threads, comes in two favors:
*
* 1. waitForAsyncHBThreadFinish == false. Only wait for the async threads to
* pick up all pending heartbeat requests. Not necessarily wait for all
* threads to finish processing the last request. This is used to make sure
* all new UAM are launched by the async threads, but at the same time will
* finish draining while (slow) RM is still processing the last heartbeat
* request.
*
* 2. waitForAsyncHBThreadFinish == true. Wait for all async thread to finish
* processing all heartbeat requests.
*/
protected void drainAllAsyncQueue(boolean waitForAsyncHBThreadFinish)
throws YarnException {
LOG.info("waiting to drain home heartbeat handler");
if (waitForAsyncHBThreadFinish) {
getHomeHeartbeartHandler().drainHeartbeatThread();
} else {
while (getHomeHeartbeartHandler().getRequestQueueSize() > 0) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
}
}
LOG.info("waiting to drain UAM heartbeat handlers");
UnmanagedAMPoolManager uamPool = getUnmanagedAMPool();
if (waitForAsyncHBThreadFinish) {
getUnmanagedAMPool().drainUAMHeartbeats();
} else {
while (true) {
boolean done = true;
for (String scId : uamPool.getAllUAMIds()) {
if (uamPool.getRequestQueueSize(scId) > 0) {
done = false;
break;
}
}
if (done) {
break;
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
}
}
}
protected UserGroupInformation getUGIWithToken(
ApplicationAttemptId appAttemptId) {
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(appAttemptId.toString());
AMRMTokenIdentifier token = new AMRMTokenIdentifier(appAttemptId, 1);
ugi.addTokenIdentifier(token);
return ugi;
}
/**
* Extends the UnmanagedAMPoolManager and overrides methods to provide a
* testable implementation of UnmanagedAMPoolManager.
@ -141,6 +221,7 @@ public class TestableFederationInterceptor extends FederationInterceptor {
String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) {
super(conf, appId, queueName, submitter, appNameSuffix,
keepContainersAcrossApplicationAttempts, "TEST");
setHandlerThread(new TestableAMRequestHandlerThread(conf, appId));
}
/**
@ -156,4 +237,30 @@ public class TestableFederationInterceptor extends FederationInterceptor {
YarnConfiguration.getClusterId(config));
}
}
/**
* Wrap the handler thread so it calls from the same user.
*/
protected class TestableAMRequestHandlerThread
extends AMHeartbeatRequestHandler {
public TestableAMRequestHandlerThread(Configuration conf,
ApplicationId applicationId) {
super(conf, applicationId);
}
@Override
public void run() {
try {
getUGIWithToken(getAttemptId())
.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() {
TestableAMRequestHandlerThread.super.run();
return null;
}
});
} catch (Exception e) {
}
}
}
}

View File

@ -80,7 +80,6 @@ import com.google.common.annotations.VisibleForTesting;
public class ApplicationMasterService extends AbstractService implements
ApplicationMasterProtocol {
private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class);
private static final int PRE_REGISTER_RESPONSE_ID = -1;
private final AMLivelinessMonitor amLivelinessMonitor;
private YarnScheduler rScheduler;
@ -332,11 +331,6 @@ public class ApplicationMasterService extends AbstractService implements
protected static final Allocation EMPTY_ALLOCATION = new Allocation(
EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null);
private int getNextResponseId(int responseId) {
// Loop between 0 to Integer.MAX_VALUE
return (responseId + 1) & Integer.MAX_VALUE;
}
@Override
public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException {
@ -370,8 +364,8 @@ public class ApplicationMasterService extends AbstractService implements
}
// Normally request.getResponseId() == lastResponse.getResponseId()
if (getNextResponseId(request.getResponseId()) == lastResponse
.getResponseId()) {
if (AMRMClientUtils.getNextResponseId(
request.getResponseId()) == lastResponse.getResponseId()) {
// heartbeat one step old, simply return lastReponse
return lastResponse;
} else if (request.getResponseId() != lastResponse.getResponseId()) {
@ -416,7 +410,8 @@ public class ApplicationMasterService extends AbstractService implements
* need to worry about unregister call occurring in between (which
* removes the lock object).
*/
response.setResponseId(getNextResponseId(lastResponse.getResponseId()));
response.setResponseId(
AMRMClientUtils.getNextResponseId(lastResponse.getResponseId()));
lock.setAllocateResponse(response);
return response;
}
@ -427,7 +422,7 @@ public class ApplicationMasterService extends AbstractService implements
recordFactory.newRecordInstance(AllocateResponse.class);
// set response id to -1 before application master for the following
// attemptID get registered
response.setResponseId(PRE_REGISTER_RESPONSE_ID);
response.setResponseId(AMRMClientUtils.PRE_REGISTER_RESPONSE_ID);
LOG.info("Registering app attempt : " + attemptId);
responseMap.put(attemptId, new AllocateResponseLock(response));
rmContext.getNMTokenSecretManager().registerApplicationAttempt(attemptId);