diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index a82801d620e..d69ae57ca9e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -3221,6 +3221,11 @@ public class YarnConfiguration extends Configuration { "org.apache.hadoop.yarn.server.federation.resolver." + "DefaultSubClusterResolverImpl"; + // the maximum wait time for the first async heartbeat response + public static final String FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS = + FEDERATION_PREFIX + "amrmproxy.hb.maximum.wait.ms"; + public static final long DEFAULT_FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS = 5000; + // AMRMProxy split-merge timeout for active sub-clusters. We will not route // new asks to expired sub-clusters. public static final String FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index d63933cac18..6f781fa7c0d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -105,6 +105,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase { .add(YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER); configurationPropsToSkipCompare .add(YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS); + configurationPropsToSkipCompare + .add(YarnConfiguration.FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS); configurationPropsToSkipCompare .add(YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java index b8319cd2d36..34a9b34fc74 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java @@ -52,6 +52,8 @@ public final class AMRMClientUtils { private static final Logger LOG = LoggerFactory.getLogger(AMRMClientUtils.class); + public static final int PRE_REGISTER_RESPONSE_ID = -1; + public static final String APP_ALREADY_REGISTERED_MESSAGE = "Application Master is already registered : "; @@ -152,6 +154,11 @@ public final class AMRMClientUtils { } } + public static int getNextResponseId(int responseId) { + // Loop between 0 to Integer.MAX_VALUE + return (responseId + 1) & Integer.MAX_VALUE; + } + public static void addToOutstandingSchedulingRequests( Collection requests, Map, List> outstandingSchedRequests) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java index 42227bb61d9..380c2169eb0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java @@ -47,6 +47,9 @@ public class AMHeartbeatRequestHandler extends Thread { // Indication flag for the thread to keep running private volatile boolean keepRunning; + // For unit test draining + private volatile boolean isThreadWaiting; + private Configuration conf; private ApplicationId applicationId; @@ -61,6 +64,7 @@ public class AMHeartbeatRequestHandler extends Thread { this.setUncaughtExceptionHandler( new HeartBeatThreadUncaughtExceptionHandler()); this.keepRunning = true; + this.isThreadWaiting = false; this.conf = conf; this.applicationId = applicationId; @@ -82,12 +86,15 @@ public class AMHeartbeatRequestHandler extends Thread { while (keepRunning) { AsyncAllocateRequestInfo requestInfo; try { - requestInfo = requestQueue.take(); + this.isThreadWaiting = true; + requestInfo = this.requestQueue.take(); + this.isThreadWaiting = false; + if (requestInfo == null) { throw new YarnException( "Null requestInfo taken from request queue"); } - if (!keepRunning) { + if (!this.keepRunning) { break; } @@ -98,7 +105,7 @@ public class AMHeartbeatRequestHandler extends Thread { throw new YarnException("Null allocateRequest from requestInfo"); } if (LOG.isDebugEnabled()) { - LOG.debug("Sending Heartbeat to Unmanaged AM. AskList:" + LOG.debug("Sending Heartbeat to RM. AskList:" + ((request.getAskList() == null) ? " empty" : request.getAskList().size())); } @@ -181,6 +188,16 @@ public class AMHeartbeatRequestHandler extends Thread { } } + @VisibleForTesting + public void drainHeartbeatThread() { + while (!this.isThreadWaiting || this.requestQueue.size() > 0) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + } + } + } + @VisibleForTesting public int getRequestQueueSize() { return this.requestQueue.size(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java index 662431836a7..13545c9d2ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java @@ -78,7 +78,7 @@ public class FederationRegistryClient { * * @return the list of known applications */ - public List getAllApplications() { + public synchronized List getAllApplications() { // Suppress the exception here because it is valid that the entry does not // exist List applications = null; @@ -99,7 +99,7 @@ public class FederationRegistryClient { * For testing, delete all application records in registry. */ @VisibleForTesting - public void cleanAllApplications() { + public synchronized void cleanAllApplications() { try { removeKeyRegistry(this.registry, this.user, getRegistryKey(null, null), true, false); @@ -115,7 +115,7 @@ public class FederationRegistryClient { * @param token the UAM of the application * @return whether the amrmToken is added or updated to a new value */ - public boolean writeAMRMTokenForUAM(ApplicationId appId, + public synchronized boolean writeAMRMTokenForUAM(ApplicationId appId, String subClusterId, Token token) { Map> subClusterTokenMap = this.appSubClusterTokenMap.get(appId); @@ -154,7 +154,7 @@ public class FederationRegistryClient { * @param appId application id * @return the sub-cluster to UAM token mapping */ - public Map> + public synchronized Map> loadStateFromRegistry(ApplicationId appId) { Map> retMap = new HashMap<>(); // Suppress the exception here because it is valid that the entry does not @@ -203,7 +203,7 @@ public class FederationRegistryClient { * * @param appId application id */ - public void removeAppFromRegistry(ApplicationId appId) { + public synchronized void removeAppFromRegistry(ApplicationId appId) { Map> subClusterTokenMap = this.appSubClusterTokenMap.get(appId); LOG.info("Removing all registry entries for {}", appId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java index d708cedec17..d5a0168175e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java @@ -407,4 +407,19 @@ public class UnmanagedAMPoolManager extends AbstractService { return this.unmanagedAppMasterMap.get(uamId).getAMRMClientRelayer(); } + @VisibleForTesting + public int getRequestQueueSize(String uamId) throws YarnException { + if (!this.unmanagedAppMasterMap.containsKey(uamId)) { + throw new YarnException("UAM " + uamId + " does not exist"); + } + return this.unmanagedAppMasterMap.get(uamId).getRequestQueueSize(); + } + + @VisibleForTesting + public void drainUAMHeartbeats() { + for (UnmanagedApplicationManager uam : this.unmanagedAppMasterMap + .values()) { + uam.drainHeartbeatThread(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java index 7c1e154c5e7..91d5d6c0bed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java @@ -225,6 +225,10 @@ public class UnmanagedApplicationManager { LOG.debug("RegisterUAM returned existing NM token for node " + nmToken.getNodeId()); } + LOG.info( + "RegisterUAM returned {} existing running container and {} NM tokens", + response.getContainersFromPreviousAttempts().size(), + response.getNMTokensFromPreviousAttempts().size()); // Only when register succeed that we start the heartbeat thread this.heartbeatHandler.setDaemon(true); @@ -516,4 +520,14 @@ public class UnmanagedApplicationManager { public int getRequestQueueSize() { return this.heartbeatHandler.getRequestQueueSize(); } + + @VisibleForTesting + protected void setHandlerThread(AMHeartbeatRequestHandler thread) { + this.heartbeatHandler = thread; + } + + @VisibleForTesting + protected void drainHeartbeatThread() { + this.heartbeatHandler.drainHeartbeatThread(); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java index 958b1f18f70..50a4bff51d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java @@ -189,8 +189,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, private HashSet applicationMap = new HashSet<>(); private HashSet keepContainerOnUams = new HashSet<>(); - private HashMap> applicationContainerIdMap = new HashMap<>(); + private HashMap> applicationContainerIdMap = + new HashMap<>(); + private int rmId; private AtomicInteger containerIndex = new AtomicInteger(0); private Configuration conf; private int subClusterId; @@ -203,6 +204,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, private boolean shouldReRegisterNext = false; + private boolean shouldWaitForSyncNextAllocate = false; + // For unit test synchronization private static Object syncObj = new Object(); @@ -218,6 +221,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, public MockResourceManagerFacade(Configuration conf, int startContainerIndex, int subClusterId, boolean isRunning) { this.conf = conf; + this.rmId = startContainerIndex; this.containerIndex.set(startContainerIndex); this.subClusterId = subClusterId; this.isRunning = isRunning; @@ -259,17 +263,17 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, validateRunning(); ApplicationAttemptId attemptId = getAppIdentifier(); LOG.info("Registering application attempt: " + attemptId); + ApplicationId appId = attemptId.getApplicationId(); List containersFromPreviousAttempt = null; synchronized (applicationContainerIdMap) { - if (applicationContainerIdMap.containsKey(attemptId)) { - if (keepContainerOnUams.contains(attemptId.getApplicationId())) { + if (applicationContainerIdMap.containsKey(appId)) { + if (keepContainerOnUams.contains(appId)) { // For UAM with the keepContainersFromPreviousAttempt flag, return all // running containers containersFromPreviousAttempt = new ArrayList<>(); - for (ContainerId containerId : applicationContainerIdMap - .get(attemptId)) { + for (ContainerId containerId : applicationContainerIdMap.get(appId)) { containersFromPreviousAttempt.add(Container.newInstance(containerId, null, null, null, null, null)); } @@ -279,7 +283,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, } } else { // Keep track of the containers that are returned to this application - applicationContainerIdMap.put(attemptId, new ArrayList()); + applicationContainerIdMap.put(appId, new ArrayList()); } } @@ -314,6 +318,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, ApplicationAttemptId attemptId = getAppIdentifier(); LOG.info("Finishing application attempt: " + attemptId); + ApplicationId appId = attemptId.getApplicationId(); if (shouldReRegisterNext) { String message = "AM is not registered, should re-register."; @@ -324,8 +329,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, synchronized (applicationContainerIdMap) { // Remove the containers that were being tracked for this application Assert.assertTrue("The application id is NOT registered: " + attemptId, - applicationContainerIdMap.containsKey(attemptId)); - applicationContainerIdMap.remove(attemptId); + applicationContainerIdMap.containsKey(appId)); + applicationContainerIdMap.remove(appId); } return FinishApplicationMasterResponse.newInstance( @@ -350,6 +355,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, ApplicationAttemptId attemptId = getAppIdentifier(); LOG.info("Allocate from application attempt: " + attemptId); + ApplicationId appId = attemptId.getApplicationId(); if (shouldReRegisterNext) { String message = "AM is not registered, should re-register."; @@ -357,6 +363,21 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, throw new ApplicationMasterNotRegisteredException(message); } + // Wait for signal for certain test cases + synchronized (syncObj) { + if (shouldWaitForSyncNextAllocate) { + shouldWaitForSyncNextAllocate = false; + + LOG.info("Allocate call in RM start waiting"); + try { + syncObj.wait(); + LOG.info("Allocate call in RM wait finished"); + } catch (InterruptedException e) { + LOG.info("Allocate call in RM wait interrupted", e); + } + } + } + ArrayList containerList = new ArrayList(); if (request.getAskList() != null) { for (ResourceRequest rr : request.getAskList()) { @@ -381,9 +402,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, // will need it in future Assert.assertTrue( "The application id is Not registered before allocate(): " - + attemptId, - applicationContainerIdMap.containsKey(attemptId)); - List ids = applicationContainerIdMap.get(attemptId); + + appId, + applicationContainerIdMap.containsKey(appId)); + List ids = applicationContainerIdMap.get(appId); ids.add(containerId); } } @@ -395,12 +416,10 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, && request.getReleaseList().size() > 0) { LOG.info("Releasing containers: " + request.getReleaseList().size()); synchronized (applicationContainerIdMap) { - Assert - .assertTrue( - "The application id is not registered before allocate(): " - + attemptId, - applicationContainerIdMap.containsKey(attemptId)); - List ids = applicationContainerIdMap.get(attemptId); + Assert.assertTrue( + "The application id is not registered before allocate(): " + appId, + applicationContainerIdMap.containsKey(appId)); + List ids = applicationContainerIdMap.get(appId); for (ContainerId id : request.getReleaseList()) { boolean found = false; @@ -426,7 +445,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, + " for application attempt: " + conf.get("AMRMTOKEN")); // Always issue a new AMRMToken as if RM rolled master key - Token newAMRMToken = Token.newInstance(new byte[0], "", new byte[0], ""); + Token newAMRMToken = Token.newInstance(new byte[0], + Integer.toString(this.rmId), new byte[0], ""); return AllocateResponse.newInstance(0, completedList, containerList, new ArrayList(), null, AMCommand.AM_RESYNC, 1, null, @@ -434,6 +454,12 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, new ArrayList()); } + public void setWaitForSyncNextAllocate(boolean wait) { + synchronized (syncObj) { + shouldWaitForSyncNextAllocate = wait; + } + } + @Override public GetApplicationReportResponse getApplicationReport( GetApplicationReportRequest request) throws YarnException, IOException { @@ -624,14 +650,14 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, validateRunning(); - ApplicationAttemptId attemptId = request.getApplicationAttemptId(); + ApplicationId appId = request.getApplicationAttemptId().getApplicationId(); List containers = new ArrayList<>(); synchronized (applicationContainerIdMap) { // Return the list of running containers that were being tracked for this // application - Assert.assertTrue("The application id is NOT registered: " + attemptId, - applicationContainerIdMap.containsKey(attemptId)); - List ids = applicationContainerIdMap.get(attemptId); + Assert.assertTrue("The application id is NOT registered: " + appId, + applicationContainerIdMap.containsKey(appId)); + List ids = applicationContainerIdMap.get(appId); for (ContainerId c : ids) { containers.add(ContainerReport.newInstance(c, null, null, null, 0, 0, null, null, 0, null, null)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index 1bf882ffc41..c02296de1fa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -62,14 +64,18 @@ import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.StrictPreemptionContract; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; +import org.apache.hadoop.yarn.client.AMRMClientUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.AMHeartbeatRequestHandler; import org.apache.hadoop.yarn.server.AMRMClientRelayer; import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; @@ -80,9 +86,9 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager; -import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; import org.apache.hadoop.yarn.util.AsyncCallback; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.MonotonicClock; import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -116,6 +122,17 @@ public class FederationInterceptor extends AbstractRequestInterceptor { NMSS_CLASS_PREFIX + "secondarySC/"; public static final String STRING_TO_BYTE_FORMAT = "UTF-8"; + private static final RecordFactory RECORD_FACTORY = + RecordFactoryProvider.getRecordFactory(null); + + /** + * From AM's perspective, FederationInterceptor behaves exactly the same as + * YarnRM (ApplicationMasterService). This is to remember the last heart beat + * response, used to handle duplicate heart beat and responseId from AM. + */ + private AllocateResponse lastAllocateResponse; + private final Object lastAllocateResponseLock = new Object(); + private ApplicationAttemptId attemptId; /** @@ -124,7 +141,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { */ private AMRMClientRelayer homeRMRelayer; private SubClusterId homeSubClusterId; - private volatile int lastHomeResponseId; + private AMHeartbeatRequestHandler homeHeartbeartHandler; /** * UAM pool for secondary sub-clusters (ones other than home sub-cluster), @@ -146,7 +163,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { /** * Stores the AllocateResponses that are received asynchronously from all the - * sub-cluster resource managers except the home RM. + * sub-cluster resource managers, including home RM. */ private Map> asyncResponseSink; @@ -194,14 +211,13 @@ public class FederationInterceptor extends AbstractRequestInterceptor { /** The policy used to split requests among sub-clusters. */ private FederationAMRMProxyPolicy policyInterpreter; - /** - * The proxy ugi used to talk to home RM, loaded with the up-to-date AMRMToken - * issued by home RM. - */ - private UserGroupInformation appOwner; - private FederationRegistryClient registryClient; + // the maximum wait time for the first async heart beat response + private long heartbeatMaxWaitTimeMs; + + private MonotonicClock clock = new MonotonicClock(); + /** * Creates an instance of the FederationInterceptor class. */ @@ -213,7 +229,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor { this.secondaryRelayers = new ConcurrentHashMap<>(); this.amRegistrationRequest = null; this.amRegistrationResponse = null; - this.lastHomeResponseId = Integer.MAX_VALUE; this.justRecovered = false; } @@ -233,8 +248,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor { setConf(conf); } + // The proxy ugi used to talk to home RM as well as Yarn Registry, loaded + // with the up-to-date AMRMToken issued by home RM. + UserGroupInformation appOwner; try { - this.appOwner = UserGroupInformation.createProxyUser(appContext.getUser(), + appOwner = UserGroupInformation.createProxyUser(appContext.getUser(), UserGroupInformation.getCurrentUser()); } catch (Exception ex) { throw new YarnRuntimeException(ex); @@ -242,10 +260,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor { if (appContext.getRegistryClient() != null) { this.registryClient = new FederationRegistryClient(conf, - appContext.getRegistryClient(), this.appOwner); + appContext.getRegistryClient(), appOwner); // Add all app tokens for Yarn Registry access if (appContext.getCredentials() != null) { - this.appOwner.addCredentials(appContext.getCredentials()); + appOwner.addCredentials(appContext.getCredentials()); } } @@ -254,9 +272,21 @@ public class FederationInterceptor extends AbstractRequestInterceptor { this.homeSubClusterId = SubClusterId.newInstance(YarnConfiguration.getClusterId(conf)); this.homeRMRelayer = new AMRMClientRelayer(createHomeRMProxy(appContext, - ApplicationMasterProtocol.class, this.appOwner), appId, + ApplicationMasterProtocol.class, appOwner), appId, this.homeSubClusterId.toString()); + this.homeHeartbeartHandler = createHomeHeartbeartHandler(conf, appId); + this.homeHeartbeartHandler.setAMRMClientRelayer(this.homeRMRelayer); + this.homeHeartbeartHandler.setUGI(appOwner); + this.homeHeartbeartHandler.setDaemon(true); + this.homeHeartbeartHandler.start(); + + // set lastResponseId to -1 before application master registers + this.lastAllocateResponse = + RECORD_FACTORY.newRecordInstance(AllocateResponse.class); + this.lastAllocateResponse + .setResponseId(AMRMClientUtils.PRE_REGISTER_RESPONSE_ID); + this.federationFacade = FederationStateStoreFacade.getInstance(); this.subClusterResolver = this.federationFacade.getSubClusterResolver(); @@ -265,6 +295,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor { this.uamPool.init(conf); this.uamPool.start(); + + this.heartbeatMaxWaitTimeMs = + conf.getLong(YarnConfiguration.FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS, + YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS); } @Override @@ -272,6 +306,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor { super.recover(recoveredDataMap); LOG.info("Recovering data for FederationInterceptor for {}", this.attemptId); + this.justRecovered = true; + if (recoveredDataMap == null) { return; } @@ -294,9 +330,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor { this.amRegistrationResponse = new RegisterApplicationMasterResponsePBImpl(pb); LOG.info("amRegistrationResponse recovered for {}", this.attemptId); - // Trigger re-register and full pending re-send only if we have a - // saved register response. This should always be true though. - this.justRecovered = true; } // Recover UAM amrmTokens from registry or NMSS @@ -355,6 +388,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor { .getContainersFromPreviousAttempts()) { containerIdToSubClusterIdMap.put(container.getId(), subClusterId); containers++; + LOG.debug(" From subcluster " + subClusterId + + " running container " + container.getId()); } LOG.info("Recovered {} running containers from UAM in {}", response.getContainersFromPreviousAttempts().size(), @@ -384,7 +419,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { LOG.debug(" From home RM " + this.homeSubClusterId + " running container " + container.getContainerId()); } - LOG.info("{} running containers including AM recovered from home RM ", + LOG.info("{} running containers including AM recovered from home RM {}", response.getContainerList().size(), this.homeSubClusterId); LOG.info( @@ -411,8 +446,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor { * so that when AM registers more than once, it returns the same register * success response instead of throwing * {@link InvalidApplicationMasterRequestException}. Furthermore, we present - * to AM as if we are the RM that never fails over. When actual RM fails over, - * we always re-register automatically. + * to AM as if we are the RM that never fails over (except when AMRMProxy + * restarts). When actual RM fails over, we always re-register automatically. * * We did this because FederationInterceptor can receive concurrent register * requests from AM because of timeout between AM and AMRMProxy, which is @@ -425,6 +460,13 @@ public class FederationInterceptor extends AbstractRequestInterceptor { public synchronized RegisterApplicationMasterResponse registerApplicationMaster(RegisterApplicationMasterRequest request) throws YarnException, IOException { + + // Reset the heartbeat responseId to zero upon register + synchronized (this.lastAllocateResponseLock) { + this.lastAllocateResponse.setResponseId(0); + } + this.justRecovered = false; + // If AM is calling with a different request, complain if (this.amRegistrationRequest != null) { if (!this.amRegistrationRequest.equals(request)) { @@ -524,34 +566,34 @@ public class FederationInterceptor extends AbstractRequestInterceptor { */ @Override public AllocateResponse allocate(AllocateRequest request) - throws YarnException { + throws YarnException, IOException { Preconditions.checkArgument(this.policyInterpreter != null, "Allocate should be called after registerApplicationMaster"); - if (this.justRecovered && this.lastHomeResponseId == Integer.MAX_VALUE) { - // Save the responseId home RM is expecting - this.lastHomeResponseId = request.getResponseId(); - + if (this.justRecovered) { throw new ApplicationMasterNotRegisteredException( "AMRMProxy just restarted and recovered for " + this.attemptId + ". AM should re-register and full re-send pending requests."); } - // Override responseId in the request in two cases: - // - // 1. After we just recovered after an NM restart and AM's responseId is - // reset due to the exception we generate. We need to override the - // responseId to the one homeRM expects. - // - // 2. After homeRM fail-over, the allocate response with reseted responseId - // might not be returned successfully back to AM because of RPC connection - // timeout between AM and AMRMProxy. In this case, we remember and reset the - // responseId for AM. - if (this.justRecovered - || request.getResponseId() > this.lastHomeResponseId) { - LOG.warn("Setting allocate responseId for {} from {} to {}", - this.attemptId, request.getResponseId(), this.lastHomeResponseId); - request.setResponseId(this.lastHomeResponseId); + // Check responseId and handle duplicate heartbeat exactly same as RM + synchronized (this.lastAllocateResponseLock) { + LOG.info("Heartbeat from " + this.attemptId + " with responseId " + + request.getResponseId() + " when we are expecting " + + this.lastAllocateResponse.getResponseId()); + // Normally request.getResponseId() == lastResponse.getResponseId() + if (AMRMClientUtils.getNextResponseId( + request.getResponseId()) == this.lastAllocateResponse + .getResponseId()) { + // heartbeat one step old, simply return lastReponse + return this.lastAllocateResponse; + } else if (request.getResponseId() != this.lastAllocateResponse + .getResponseId()) { + throw new InvalidApplicationMasterRequestException( + AMRMClientUtils.assembleInvalidResponseIdExceptionMessage(attemptId, + this.lastAllocateResponse.getResponseId(), + request.getResponseId())); + } } try { @@ -560,71 +602,55 @@ public class FederationInterceptor extends AbstractRequestInterceptor { Map requests = splitAllocateRequest(request); - // Send the requests to the secondary sub-cluster resource managers. - // These secondary requests are send asynchronously and the responses will - // be collected and merged with the home response. In addition, it also - // return the newly registered Unmanaged AMs. - Registrations newRegistrations = - sendRequestsToSecondaryResourceManagers(requests); + /** + * Send the requests to the all sub-cluster resource managers. All + * requests are synchronously triggered but sent asynchronously. Later the + * responses will be collected and merged. In addition, it also returns + * the newly registered UAMs. + */ + Registrations newRegistrations = sendRequestsToResourceManagers(requests); - // Send the request to the home RM and get the response - AllocateRequest homeRequest = requests.get(this.homeSubClusterId); - LOG.info("{} heartbeating to home RM with responseId {}", this.attemptId, - homeRequest.getResponseId()); - - AllocateResponse homeResponse = this.homeRMRelayer.allocate(homeRequest); - - // Reset the flag after the first successful homeRM allocate response, - // otherwise keep overriding the responseId of new allocate request - if (this.justRecovered) { - this.justRecovered = false; + // Wait for the first async response to arrive + long startTime = this.clock.getTime(); + synchronized (this.asyncResponseSink) { + try { + this.asyncResponseSink.wait(this.heartbeatMaxWaitTimeMs); + } catch (InterruptedException e) { + } } + long firstResponseTime = this.clock.getTime() - startTime; - // Notify policy of home response + // An extra brief wait for other async heart beats, so that most of their + // responses can make it back to AM in the same heart beat round trip. try { - this.policyInterpreter.notifyOfResponse(this.homeSubClusterId, - homeResponse); - } catch (YarnException e) { - LOG.warn("notifyOfResponse for policy failed for home sub-cluster " - + this.homeSubClusterId, e); + Thread.sleep(firstResponseTime); + } catch (InterruptedException e) { } - // If the resource manager sent us a new token, add to the current user - if (homeResponse.getAMRMToken() != null) { - LOG.debug("Received new AMRMToken"); - YarnServerSecurityUtils.updateAMRMToken(homeResponse.getAMRMToken(), - this.appOwner, getConf()); - } + // Prepare the response to AM + AllocateResponse response = + RECORD_FACTORY.newRecordInstance(AllocateResponse.class); - // Merge the responses from home and secondary sub-cluster RMs - homeResponse = mergeAllocateResponses(homeResponse); + // Merge all responses from response sink + mergeAllocateResponses(response); // Merge the containers and NMTokens from the new registrations into - // the homeResponse. + // the response if (!isNullOrEmpty(newRegistrations.getSuccessfulRegistrations())) { - homeResponse = mergeRegistrationResponses(homeResponse, + mergeRegistrationResponses(response, newRegistrations.getSuccessfulRegistrations()); } - LOG.info("{} heartbeat response from home RM with responseId {}", - this.attemptId, homeResponse.getResponseId()); - - // Update lastHomeResponseId in three cases: - // 1. The normal responseId increments - // 2. homeResponse.getResponseId() == 1. This happens when homeRM fails - // over, AMRMClientRelayer auto re-register and full re-send for homeRM. - // 3. lastHomeResponseId == MAX_INT. This is the initial case or - // responseId about to overflow and wrap around - if (homeResponse.getResponseId() == this.lastHomeResponseId + 1 - || homeResponse.getResponseId() == 1 - || this.lastHomeResponseId == Integer.MAX_VALUE) { - this.lastHomeResponseId = homeResponse.getResponseId(); + // update the responseId and return the final response to AM + synchronized (this.lastAllocateResponseLock) { + response.setResponseId(AMRMClientUtils + .getNextResponseId(this.lastAllocateResponse.getResponseId())); + this.lastAllocateResponse = response; } - - // return the final response to the application master. - return homeResponse; - } catch (IOException ex) { - LOG.error("Exception encountered while processing heart beat", ex); + return response; + } catch (Throwable ex) { + LOG.error("Exception encountered while processing heart beat for " + + this.attemptId, ex); throw new YarnException(ex); } } @@ -696,6 +722,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor { FinishApplicationMasterResponse homeResponse = this.homeRMRelayer.finishApplicationMaster(request); + // Stop the home heartbeat thread + this.homeHeartbeartHandler.shutdown(); + if (subClusterIds.size() > 0) { // Wait for other sub-cluster resource managers to return the // response and merge it with the home response @@ -758,10 +787,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } this.threadpool = null; } - homeRMRelayer.shutdown(); - for(AMRMClientRelayer relayer : secondaryRelayers.values()){ + + // Stop the home heartbeat thread + this.homeHeartbeartHandler.shutdown(); + this.homeRMRelayer.shutdown(); + for (AMRMClientRelayer relayer : this.secondaryRelayers.values()) { relayer.shutdown(); } + super.shutdown(); } @@ -781,8 +814,13 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } @VisibleForTesting - protected int getLastHomeResponseId() { - return this.lastHomeResponseId; + protected ApplicationAttemptId getAttemptId() { + return this.attemptId; + } + + @VisibleForTesting + protected AMHeartbeatRequestHandler getHomeHeartbeartHandler() { + return this.homeHeartbeartHandler; } /** @@ -798,6 +836,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor { return new UnmanagedAMPoolManager(threadPool); } + @VisibleForTesting + protected AMHeartbeatRequestHandler createHomeHeartbeartHandler( + Configuration conf, ApplicationId appId) { + return new AMHeartbeatRequestHandler(conf, appId); + } + /** * Create a proxy instance that is used to connect to the Home resource * manager. @@ -872,7 +916,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { + "Reattaching in parallel", uamMap.size(), appId); ExecutorCompletionService - completionService = new ExecutorCompletionService<>(threadpool); + completionService = new ExecutorCompletionService<>(this.threadpool); for (Entry> entry : uamMap.entrySet()) { final SubClusterId subClusterId = @@ -1047,16 +1091,16 @@ public class FederationInterceptor extends AbstractRequestInterceptor { /** * This methods sends the specified AllocateRequests to the appropriate - * sub-cluster resource managers. + * sub-cluster resource managers asynchronously. * * @param requests contains the heart beat requests to send to the resource - * manager keyed by the resource manager address + * manager keyed by the sub-cluster id * @return the registration responses from the newly added sub-cluster * resource managers * @throws YarnException * @throws IOException */ - private Registrations sendRequestsToSecondaryResourceManagers( + private Registrations sendRequestsToResourceManagers( Map requests) throws YarnException, IOException { @@ -1065,32 +1109,29 @@ public class FederationInterceptor extends AbstractRequestInterceptor { Registrations registrations = registerWithNewSubClusters(requests.keySet()); // Now that all the registrations are done, send the allocation request - // to the sub-cluster RMs using the Unmanaged application masters - // asynchronously and don't wait for the response. The responses will - // arrive asynchronously and will be added to the response sink. These - // responses will be sent to the application master in some future heart - // beat response. + // to the sub-cluster RMs asynchronously and don't wait for the response. + // The responses will arrive asynchronously and will be added to the + // response sink, then merged and sent to the application master. for (Entry entry : requests.entrySet()) { - final SubClusterId subClusterId = entry.getKey(); + SubClusterId subClusterId = entry.getKey(); if (subClusterId.equals(this.homeSubClusterId)) { - // Skip the request for the home sub-cluster resource manager. - // It will be handled separately in the allocate() method - continue; + // Request for the home sub-cluster resource manager + this.homeHeartbeartHandler.allocateAsync(entry.getValue(), + new HeartbeatCallBack(this.homeSubClusterId, false)); + } else { + if (!this.uamPool.hasUAMId(subClusterId.getId())) { + // TODO: This means that the registration for this sub-cluster RM + // failed. For now, we ignore the resource requests and continue + // but we need to fix this and handle this situation. One way would + // be to send the request to another RM by consulting the policy. + LOG.warn("Unmanaged AM registration not found for sub-cluster {}", + subClusterId); + continue; + } + this.uamPool.allocateAsync(subClusterId.getId(), entry.getValue(), + new HeartbeatCallBack(subClusterId, true)); } - - if (!this.uamPool.hasUAMId(subClusterId.getId())) { - // TODO: This means that the registration for this sub-cluster RM - // failed. For now, we ignore the resource requests and continue - // but we need to fix this and handle this situation. One way would - // be to send the request to another RM by consulting the policy. - LOG.warn("Unmanaged AM registration not found for sub-cluster {}", - subClusterId); - continue; - } - - this.uamPool.allocateAsync(subClusterId.getId(), entry.getValue(), - new HeartbeatCallBack(subClusterId)); } return registrations; @@ -1123,7 +1164,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { this.amRegistrationRequest; final AMRMProxyApplicationContext appContext = getApplicationContext(); ExecutorCompletionService - completionService = new ExecutorCompletionService<>(threadpool); + completionService = new ExecutorCompletionService<>(this.threadpool); for (final String subClusterId : newSubClusters) { completionService @@ -1208,21 +1249,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } /** - * Merges the responses from other sub-clusters that we received - * asynchronously with the specified home cluster response and keeps track of - * the containers received from each sub-cluster resource managers. + * Merge the responses from all sub-clusters that we received asynchronously + * and keeps track of the containers received from each sub-cluster resource + * managers. */ - private AllocateResponse mergeAllocateResponses( - AllocateResponse homeResponse) { - // Timing issue, we need to remove the completed and then save the new ones. - removeFinishedContainersFromCache( - homeResponse.getCompletedContainersStatuses()); - cacheAllocatedContainers(homeResponse.getAllocatedContainers(), - this.homeSubClusterId); - + private void mergeAllocateResponses(AllocateResponse mergedResponse) { synchronized (this.asyncResponseSink) { - for (Entry> entry : asyncResponseSink - .entrySet()) { + for (Entry> entry : + this.asyncResponseSink.entrySet()) { SubClusterId subClusterId = entry.getKey(); List responses = entry.getValue(); if (responses.size() > 0) { @@ -1231,14 +1265,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor { response.getCompletedContainersStatuses()); cacheAllocatedContainers(response.getAllocatedContainers(), subClusterId); - mergeAllocateResponse(homeResponse, response, subClusterId); + mergeAllocateResponse(mergedResponse, response, subClusterId); } responses.clear(); } } } - - return homeResponse; } /** @@ -1256,11 +1288,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } /** - * Helper method for merging the responses from the secondary sub cluster RMs - * with the home response to return to the AM. + * Helper method for merging the registration responses from the secondary sub + * cluster RMs into the allocate response to return to the AM. */ - private AllocateResponse mergeRegistrationResponses( - AllocateResponse homeResponse, + private void mergeRegistrationResponses(AllocateResponse homeResponse, Map registrations) { for (Entry entry : @@ -1292,13 +1323,22 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } } } - - return homeResponse; } private void mergeAllocateResponse(AllocateResponse homeResponse, AllocateResponse otherResponse, SubClusterId otherRMAddress) { + if (otherResponse.getAMRMToken() != null) { + // Propagate only the new amrmToken from home sub-cluster back to + // AMRMProxyService + if (otherRMAddress.equals(this.homeSubClusterId)) { + homeResponse.setAMRMToken(otherResponse.getAMRMToken()); + } else { + throw new YarnRuntimeException( + "amrmToken from UAM " + otherRMAddress + " should be null here"); + } + } + if (!isNullOrEmpty(otherResponse.getAllocatedContainers())) { if (!isNullOrEmpty(homeResponse.getAllocatedContainers())) { homeResponse.getAllocatedContainers() @@ -1406,9 +1446,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor { SubClusterId subClusterId) { for (Container container : containers) { LOG.debug("Adding container {}", container); - if (containerIdToSubClusterIdMap.containsKey(container.getId())) { + + if (this.containerIdToSubClusterIdMap.containsKey(container.getId())) { SubClusterId existingSubClusterId = - containerIdToSubClusterIdMap.get(container.getId()); + this.containerIdToSubClusterIdMap.get(container.getId()); if (existingSubClusterId.equals(subClusterId)) { /* * When RM fails over, the new RM master might send out the same @@ -1441,7 +1482,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } } - containerIdToSubClusterIdMap.put(container.getId(), subClusterId); + this.containerIdToSubClusterIdMap.put(container.getId(), subClusterId); } } @@ -1463,7 +1504,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor { newRequest.setProgress(originalAMRequest.getProgress()); requestMap.put(subClusterId, newRequest); } - return newRequest; } @@ -1472,7 +1512,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { */ private static AllocateRequest createAllocateRequest() { AllocateRequest request = - AllocateRequest.newInstance(0, 0, null, null, null); + RECORD_FACTORY.newRecordInstance(AllocateRequest.class); request.setAskList(new ArrayList()); request.setReleaseList(new ArrayList()); ResourceBlacklistRequest blackList = @@ -1525,6 +1565,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor { return this.uamPool.getAllUAMIds().size(); } + @VisibleForTesting + protected UnmanagedAMPoolManager getUnmanagedAMPool() { + return this.uamPool; + } + @VisibleForTesting public Map> getAsyncResponseSink() { return this.asyncResponseSink; @@ -1535,9 +1580,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor { */ private class HeartbeatCallBack implements AsyncCallback { private SubClusterId subClusterId; + private boolean isUAM; - HeartbeatCallBack(SubClusterId subClusterId) { + HeartbeatCallBack(SubClusterId subClusterId, boolean isUAM) { this.subClusterId = subClusterId; + this.isUAM = isUAM; } @Override @@ -1551,16 +1598,33 @@ public class FederationInterceptor extends AbstractRequestInterceptor { asyncResponseSink.put(subClusterId, responses); } responses.add(response); + // Notify main thread about the response arrival + asyncResponseSink.notifyAll(); } // Save the new AMRMToken for the UAM if present - if (response.getAMRMToken() != null) { + if (this.isUAM && response.getAMRMToken() != null) { Token newToken = ConverterUtils .convertFromYarn(response.getAMRMToken(), (Text) null); + // Do not further propagate the new amrmToken for UAM + response.setAMRMToken(null); + // Update the token in registry or NMSS if (registryClient != null) { - registryClient.writeAMRMTokenForUAM(attemptId.getApplicationId(), - subClusterId.getId(), newToken); + if (registryClient.writeAMRMTokenForUAM(attemptId.getApplicationId(), + subClusterId.getId(), newToken)) { + try { + AMRMTokenIdentifier identifier = new AMRMTokenIdentifier(); + identifier.readFields(new DataInputStream( + new ByteArrayInputStream(newToken.getIdentifier()))); + LOG.info( + "Received new UAM amrmToken with keyId {} and " + + "service {} from {} for {}, written to Registry", + identifier.getKeyId(), newToken.getService(), subClusterId, + attemptId); + } catch (IOException e) { + } + } } else if (getNMStateStore() != null) { try { getNMStateStore().storeAMRMProxyAppContextEntry(attemptId, @@ -1573,11 +1637,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } } - // Notify policy of secondary sub-cluster responses + // Notify policy of allocate response try { policyInterpreter.notifyOfResponse(subClusterId, response); } catch (YarnException e) { - LOG.warn("notifyOfResponse for policy failed for home sub-cluster " + LOG.warn("notifyOfResponse for policy failed for sub-cluster " + subClusterId, e); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java index a837eed14ce..407ae83c981 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java @@ -33,6 +33,7 @@ import java.util.concurrent.Executors; import org.apache.hadoop.registry.client.api.RegistryOperations; import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; @@ -49,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.PreemptionMessage; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.UpdateContainerError; import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -95,6 +97,8 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { private int testAppId; private ApplicationAttemptId attemptId; + private volatile int lastResponseId; + @Override public void setUp() throws IOException { super.setUp(); @@ -120,6 +124,8 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, getConf(), attemptId, "test-user", null, null, null, registry)); interceptor.cleanupRegistry(); + + lastResponseId = 0; } @Override @@ -174,8 +180,6 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { private List getContainersAndAssert(int numberOfResourceRequests, int numberOfAllocationExcepted) throws Exception { AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); - allocateRequest.setResponseId(1); - List containers = new ArrayList(numberOfResourceRequests); List askList = @@ -187,22 +191,31 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { allocateRequest.setAskList(askList); + allocateRequest.setResponseId(lastResponseId); AllocateResponse allocateResponse = interceptor.allocate(allocateRequest); Assert.assertNotNull("allocate() returned null response", allocateResponse); + checkAMRMToken(allocateResponse.getAMRMToken()); + lastResponseId = allocateResponse.getResponseId(); containers.addAll(allocateResponse.getAllocatedContainers()); LOG.info("Number of allocated containers in the original request: " + Integer.toString(allocateResponse.getAllocatedContainers().size())); + // Make sure this request is picked up by all async heartbeat handlers + interceptor.drainAllAsyncQueue(false); + // Send max 10 heart beats to receive all the containers. If not, we will // fail the test int numHeartbeat = 0; while (containers.size() < numberOfAllocationExcepted && numHeartbeat++ < 10) { - allocateResponse = - interceptor.allocate(Records.newRecord(AllocateRequest.class)); + allocateRequest = Records.newRecord(AllocateRequest.class); + allocateRequest.setResponseId(lastResponseId); + allocateResponse = interceptor.allocate(allocateRequest); Assert.assertNotNull("allocate() returned null response", allocateResponse); + checkAMRMToken(allocateResponse.getAMRMToken()); + lastResponseId = allocateResponse.getResponseId(); containers.addAll(allocateResponse.getAllocatedContainers()); @@ -220,8 +233,6 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { throws Exception { Assert.assertTrue(containers.size() > 0); AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); - allocateRequest.setResponseId(1); - List relList = new ArrayList(containers.size()); for (Container container : containers) { relList.add(container.getId()); @@ -229,8 +240,11 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { allocateRequest.setReleaseList(relList); + allocateRequest.setResponseId(lastResponseId); AllocateResponse allocateResponse = interceptor.allocate(allocateRequest); Assert.assertNotNull(allocateResponse); + checkAMRMToken(allocateResponse.getAMRMToken()); + lastResponseId = allocateResponse.getResponseId(); // The release request will be split and handled by the corresponding UAM. // The release containers returned by the mock resource managers will be @@ -244,14 +258,21 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { LOG.info("Number of containers received in the original request: " + Integer.toString(newlyFinished.size())); + // Make sure this request is picked up by all async heartbeat handlers + interceptor.drainAllAsyncQueue(false); + // Send max 10 heart beats to receive all the containers. If not, we will // fail the test int numHeartbeat = 0; while (containersForReleasedContainerIds.size() < relList.size() && numHeartbeat++ < 10) { - allocateResponse = - interceptor.allocate(Records.newRecord(AllocateRequest.class)); + allocateRequest = Records.newRecord(AllocateRequest.class); + allocateRequest.setResponseId(lastResponseId); + allocateResponse = interceptor.allocate(allocateRequest); Assert.assertNotNull(allocateResponse); + checkAMRMToken(allocateResponse.getAMRMToken()); + lastResponseId = allocateResponse.getResponseId(); + newlyFinished = getCompletedContainerIds( allocateResponse.getCompletedContainersStatuses()); containersForReleasedContainerIds.addAll(newlyFinished); @@ -267,65 +288,81 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { containersForReleasedContainerIds.size()); } + private void checkAMRMToken(Token amrmToken) { + if (amrmToken != null) { + // The token should be the one issued by home MockRM + Assert.assertTrue(amrmToken.getKind().equals(Integer.toString(0))); + } + } + @Test public void testMultipleSubClusters() throws Exception { + UserGroupInformation ugi = + interceptor.getUGIWithToken(interceptor.getAttemptId()); + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + // Register the application + RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(0); + registerReq.setTrackingUrl(""); - // Register the application - RegisterApplicationMasterRequest registerReq = - Records.newRecord(RegisterApplicationMasterRequest.class); - registerReq.setHost(Integer.toString(testAppId)); - registerReq.setRpcPort(0); - registerReq.setTrackingUrl(""); + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + Assert.assertNotNull(registerResponse); + lastResponseId = 0; - RegisterApplicationMasterResponse registerResponse = - interceptor.registerApplicationMaster(registerReq); - Assert.assertNotNull(registerResponse); + Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); - Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); + // Allocate the first batch of containers, with sc1 and sc2 active + registerSubCluster(SubClusterId.newInstance("SC-1")); + registerSubCluster(SubClusterId.newInstance("SC-2")); - // Allocate the first batch of containers, with sc1 and sc2 active - registerSubCluster(SubClusterId.newInstance("SC-1")); - registerSubCluster(SubClusterId.newInstance("SC-2")); + int numberOfContainers = 3; + List containers = + getContainersAndAssert(numberOfContainers, numberOfContainers * 2); + Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize()); - int numberOfContainers = 3; - List containers = - getContainersAndAssert(numberOfContainers, numberOfContainers * 2); - Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize()); + // Allocate the second batch of containers, with sc1 and sc3 active + deRegisterSubCluster(SubClusterId.newInstance("SC-2")); + registerSubCluster(SubClusterId.newInstance("SC-3")); - // Allocate the second batch of containers, with sc1 and sc3 active - deRegisterSubCluster(SubClusterId.newInstance("SC-2")); - registerSubCluster(SubClusterId.newInstance("SC-3")); + numberOfContainers = 1; + containers.addAll( + getContainersAndAssert(numberOfContainers, numberOfContainers * 2)); + Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize()); - numberOfContainers = 1; - containers.addAll( - getContainersAndAssert(numberOfContainers, numberOfContainers * 2)); - Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize()); + // Allocate the third batch of containers with only in home sub-cluster + // active + deRegisterSubCluster(SubClusterId.newInstance("SC-1")); + deRegisterSubCluster(SubClusterId.newInstance("SC-3")); + registerSubCluster(SubClusterId.newInstance(HOME_SC_ID)); - // Allocate the third batch of containers with only in home sub-cluster - // active - deRegisterSubCluster(SubClusterId.newInstance("SC-1")); - deRegisterSubCluster(SubClusterId.newInstance("SC-3")); - registerSubCluster(SubClusterId.newInstance(HOME_SC_ID)); + numberOfContainers = 2; + containers.addAll( + getContainersAndAssert(numberOfContainers, numberOfContainers * 1)); + Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize()); - numberOfContainers = 2; - containers.addAll( - getContainersAndAssert(numberOfContainers, numberOfContainers * 1)); - Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize()); + // Release all containers + releaseContainersAndAssert(containers); - // Release all containers - releaseContainersAndAssert(containers); + // Finish the application + FinishApplicationMasterRequest finishReq = + Records.newRecord(FinishApplicationMasterRequest.class); + finishReq.setDiagnostics(""); + finishReq.setTrackingUrl(""); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); - // Finish the application - FinishApplicationMasterRequest finishReq = - Records.newRecord(FinishApplicationMasterRequest.class); - finishReq.setDiagnostics(""); - finishReq.setTrackingUrl(""); - finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); + FinishApplicationMasterResponse finshResponse = + interceptor.finishApplicationMaster(finishReq); + Assert.assertNotNull(finshResponse); + Assert.assertEquals(true, finshResponse.getIsUnregistered()); - FinishApplicationMasterResponse finshResponse = - interceptor.finishApplicationMaster(finishReq); - Assert.assertNotNull(finshResponse); - Assert.assertEquals(true, finshResponse.getIsUnregistered()); + return null; + } + }); } /* @@ -333,49 +370,58 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { */ @Test public void testReregister() throws Exception { + UserGroupInformation ugi = + interceptor.getUGIWithToken(interceptor.getAttemptId()); + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { - // Register the application - RegisterApplicationMasterRequest registerReq = - Records.newRecord(RegisterApplicationMasterRequest.class); - registerReq.setHost(Integer.toString(testAppId)); - registerReq.setRpcPort(0); - registerReq.setTrackingUrl(""); + // Register the application + RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(0); + registerReq.setTrackingUrl(""); - RegisterApplicationMasterResponse registerResponse = - interceptor.registerApplicationMaster(registerReq); - Assert.assertNotNull(registerResponse); + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + Assert.assertNotNull(registerResponse); + lastResponseId = 0; - Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); + Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); - // Allocate the first batch of containers - registerSubCluster(SubClusterId.newInstance("SC-1")); - registerSubCluster(SubClusterId.newInstance(HOME_SC_ID)); + // Allocate the first batch of containers + registerSubCluster(SubClusterId.newInstance("SC-1")); + registerSubCluster(SubClusterId.newInstance(HOME_SC_ID)); - interceptor.setShouldReRegisterNext(); + interceptor.setShouldReRegisterNext(); - int numberOfContainers = 3; - List containers = - getContainersAndAssert(numberOfContainers, numberOfContainers * 2); - Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + int numberOfContainers = 3; + List containers = + getContainersAndAssert(numberOfContainers, numberOfContainers * 2); + Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); - interceptor.setShouldReRegisterNext(); + interceptor.setShouldReRegisterNext(); - // Release all containers - releaseContainersAndAssert(containers); + // Release all containers + releaseContainersAndAssert(containers); - interceptor.setShouldReRegisterNext(); + interceptor.setShouldReRegisterNext(); - // Finish the application - FinishApplicationMasterRequest finishReq = - Records.newRecord(FinishApplicationMasterRequest.class); - finishReq.setDiagnostics(""); - finishReq.setTrackingUrl(""); - finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); + // Finish the application + FinishApplicationMasterRequest finishReq = + Records.newRecord(FinishApplicationMasterRequest.class); + finishReq.setDiagnostics(""); + finishReq.setTrackingUrl(""); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); - FinishApplicationMasterResponse finshResponse = - interceptor.finishApplicationMaster(finishReq); - Assert.assertNotNull(finshResponse); - Assert.assertEquals(true, finshResponse.getIsUnregistered()); + FinishApplicationMasterResponse finshResponse = + interceptor.finishApplicationMaster(finishReq); + Assert.assertNotNull(finshResponse); + Assert.assertEquals(true, finshResponse.getIsUnregistered()); + return null; + } + }); } /* @@ -442,6 +488,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { // Use port number 1001 to let mock RM block in the register call response = interceptor.registerApplicationMaster( RegisterApplicationMasterRequest.newInstance(null, 1001, null)); + lastResponseId = 0; } catch (Exception e) { LOG.info("Register thread exception", e); response = null; @@ -460,9 +507,11 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { testRecover(null); } - public void testRecover(RegistryOperations registryObj) throws Exception { - ApplicationUserInfo userInfo = getApplicationUserInfo(testAppId); - userInfo.getUser().doAs(new PrivilegedExceptionAction() { + protected void testRecover(final RegistryOperations registryObj) + throws Exception { + UserGroupInformation ugi = + interceptor.getUGIWithToken(interceptor.getAttemptId()); + ugi.doAs(new PrivilegedExceptionAction() { @Override public Object run() throws Exception { interceptor = new TestableFederationInterceptor(); @@ -480,6 +529,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { RegisterApplicationMasterResponse registerResponse = interceptor.registerApplicationMaster(registerReq); Assert.assertNotNull(registerResponse); + lastResponseId = 0; Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); @@ -492,6 +542,9 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { getContainersAndAssert(numberOfContainers, numberOfContainers * 2); Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + // Make sure all async hb threads are done + interceptor.drainAllAsyncQueue(true); + // Prepare for Federation Interceptor restart and recover Map recoveredDataMap = recoverDataMapForAppAttempt(nmStateStore, attemptId); @@ -517,22 +570,21 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { interceptor.recover(recoveredDataMap); Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); - Assert.assertEquals(Integer.MAX_VALUE, - interceptor.getLastHomeResponseId()); // The first allocate call expects a fail-over exception and re-register - int responseId = 10; - AllocateRequest allocateRequest = - Records.newRecord(AllocateRequest.class); - allocateRequest.setResponseId(responseId); try { - interceptor.allocate(allocateRequest); + AllocateRequest allocateRequest = + Records.newRecord(AllocateRequest.class); + allocateRequest.setResponseId(lastResponseId); + AllocateResponse allocateResponse = + interceptor.allocate(allocateRequest); + lastResponseId = allocateResponse.getResponseId(); Assert.fail("Expecting an ApplicationMasterNotRegisteredException " + " after FederationInterceptor restarts and recovers"); } catch (ApplicationMasterNotRegisteredException e) { } interceptor.registerApplicationMaster(registerReq); - Assert.assertEquals(responseId, interceptor.getLastHomeResponseId()); + lastResponseId = 0; // Release all containers releaseContainersAndAssert(containers); @@ -614,6 +666,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { RegisterApplicationMasterResponse registerResponse = interceptor.registerApplicationMaster(registerReq); Assert.assertNotNull(registerResponse); + lastResponseId = 0; } } @@ -629,6 +682,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { RegisterApplicationMasterResponse registerResponse = interceptor.registerApplicationMaster(registerReq); Assert.assertNotNull(registerResponse); + lastResponseId = 0; // Register the application second time with a different request obj registerReq = Records.newRecord(RegisterApplicationMasterRequest.class); @@ -637,6 +691,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { registerReq.setTrackingUrl("different"); try { registerResponse = interceptor.registerApplicationMaster(registerReq); + lastResponseId = 0; Assert.fail("Should throw if a different request obj is used"); } catch (YarnException e) { } @@ -689,20 +744,22 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { @Test public void testSecondAttempt() throws Exception { - ApplicationUserInfo userInfo = getApplicationUserInfo(testAppId); - userInfo.getUser().doAs(new PrivilegedExceptionAction() { + final RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(testAppId); + registerReq.setTrackingUrl(""); + + UserGroupInformation ugi = + interceptor.getUGIWithToken(interceptor.getAttemptId()); + ugi.doAs(new PrivilegedExceptionAction() { @Override public Object run() throws Exception { // Register the application - RegisterApplicationMasterRequest registerReq = - Records.newRecord(RegisterApplicationMasterRequest.class); - registerReq.setHost(Integer.toString(testAppId)); - registerReq.setRpcPort(testAppId); - registerReq.setTrackingUrl(""); - RegisterApplicationMasterResponse registerResponse = interceptor.registerApplicationMaster(registerReq); Assert.assertNotNull(registerResponse); + lastResponseId = 0; Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); @@ -714,10 +771,13 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { List containers = getContainersAndAssert(numberOfContainers, numberOfContainers * 2); for (Container c : containers) { - System.out.println(c.getId() + " ha"); + LOG.info("Allocated container " + c.getId()); } Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + // Make sure all async hb threads are done + interceptor.drainAllAsyncQueue(true); + // Preserve the mock RM instances for secondaries ConcurrentHashMap secondaries = interceptor.getSecondaryRMs(); @@ -729,8 +789,20 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { interceptor = new TestableFederationInterceptor(null, secondaries); interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, getConf(), attemptId, "test-user", null, null, null, registry)); - registerResponse = interceptor.registerApplicationMaster(registerReq); + return null; + } + }); + // Update the ugi with new attemptId + ugi = interceptor.getUGIWithToken(interceptor.getAttemptId()); + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + lastResponseId = 0; + + int numberOfContainers = 3; // Should re-attach secondaries and get the three running containers Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); Assert.assertEquals(numberOfContainers, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java index 33617d41558..78f6eb068ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; @@ -26,18 +27,26 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.AMHeartbeatRequestHandler; import org.apache.hadoop.yarn.server.MockResourceManagerFacade; import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager; import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Extends the FederationInterceptor and overrides methods to provide a testable * implementation of FederationInterceptor. */ public class TestableFederationInterceptor extends FederationInterceptor { + public static final Logger LOG = + LoggerFactory.getLogger(TestableFederationInterceptor.class); + private ConcurrentHashMap secondaryResourceManagers = new ConcurrentHashMap<>(); private AtomicInteger runningIndex = new AtomicInteger(0); @@ -58,6 +67,12 @@ public class TestableFederationInterceptor extends FederationInterceptor { return new TestableUnmanagedAMPoolManager(threadPool); } + @Override + protected AMHeartbeatRequestHandler createHomeHeartbeartHandler( + Configuration conf, ApplicationId appId) { + return new TestableAMRequestHandlerThread(conf, appId); + } + @SuppressWarnings("unchecked") @Override protected T createHomeRMProxy(AMRMProxyApplicationContext appContext, @@ -109,6 +124,71 @@ public class TestableFederationInterceptor extends FederationInterceptor { return secondaryResourceManagers; } + protected MockResourceManagerFacade getSecondaryRM(String scId) { + return secondaryResourceManagers.get(scId); + } + + /** + * Drain all aysnc heartbeat threads, comes in two favors: + * + * 1. waitForAsyncHBThreadFinish == false. Only wait for the async threads to + * pick up all pending heartbeat requests. Not necessarily wait for all + * threads to finish processing the last request. This is used to make sure + * all new UAM are launched by the async threads, but at the same time will + * finish draining while (slow) RM is still processing the last heartbeat + * request. + * + * 2. waitForAsyncHBThreadFinish == true. Wait for all async thread to finish + * processing all heartbeat requests. + */ + protected void drainAllAsyncQueue(boolean waitForAsyncHBThreadFinish) + throws YarnException { + + LOG.info("waiting to drain home heartbeat handler"); + if (waitForAsyncHBThreadFinish) { + getHomeHeartbeartHandler().drainHeartbeatThread(); + } else { + while (getHomeHeartbeartHandler().getRequestQueueSize() > 0) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + } + } + } + + LOG.info("waiting to drain UAM heartbeat handlers"); + UnmanagedAMPoolManager uamPool = getUnmanagedAMPool(); + if (waitForAsyncHBThreadFinish) { + getUnmanagedAMPool().drainUAMHeartbeats(); + } else { + while (true) { + boolean done = true; + for (String scId : uamPool.getAllUAMIds()) { + if (uamPool.getRequestQueueSize(scId) > 0) { + done = false; + break; + } + } + if (done) { + break; + } + try { + Thread.sleep(10); + } catch (InterruptedException e) { + } + } + } + } + + protected UserGroupInformation getUGIWithToken( + ApplicationAttemptId appAttemptId) { + UserGroupInformation ugi = + UserGroupInformation.createRemoteUser(appAttemptId.toString()); + AMRMTokenIdentifier token = new AMRMTokenIdentifier(appAttemptId, 1); + ugi.addTokenIdentifier(token); + return ugi; + } + /** * Extends the UnmanagedAMPoolManager and overrides methods to provide a * testable implementation of UnmanagedAMPoolManager. @@ -141,6 +221,7 @@ public class TestableFederationInterceptor extends FederationInterceptor { String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) { super(conf, appId, queueName, submitter, appNameSuffix, keepContainersAcrossApplicationAttempts, "TEST"); + setHandlerThread(new TestableAMRequestHandlerThread(conf, appId)); } /** @@ -156,4 +237,30 @@ public class TestableFederationInterceptor extends FederationInterceptor { YarnConfiguration.getClusterId(config)); } } + + /** + * Wrap the handler thread so it calls from the same user. + */ + protected class TestableAMRequestHandlerThread + extends AMHeartbeatRequestHandler { + public TestableAMRequestHandlerThread(Configuration conf, + ApplicationId applicationId) { + super(conf, applicationId); + } + + @Override + public void run() { + try { + getUGIWithToken(getAttemptId()) + .doAs(new PrivilegedExceptionAction() { + @Override + public Object run() { + TestableAMRequestHandlerThread.super.run(); + return null; + } + }); + } catch (Exception e) { + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 6fe0aa9c9c3..70b7498b9e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -84,7 +84,6 @@ import com.google.common.annotations.VisibleForTesting; public class ApplicationMasterService extends AbstractService implements ApplicationMasterProtocol { private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class); - private static final int PRE_REGISTER_RESPONSE_ID = -1; private final AMLivelinessMonitor amLivelinessMonitor; private YarnScheduler rScheduler; @@ -377,11 +376,6 @@ public class ApplicationMasterService extends AbstractService implements protected static final Allocation EMPTY_ALLOCATION = new Allocation( EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null); - private int getNextResponseId(int responseId) { - // Loop between 0 to Integer.MAX_VALUE - return (responseId + 1) & Integer.MAX_VALUE; - } - @Override public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException { @@ -415,8 +409,8 @@ public class ApplicationMasterService extends AbstractService implements } // Normally request.getResponseId() == lastResponse.getResponseId() - if (getNextResponseId(request.getResponseId()) == lastResponse - .getResponseId()) { + if (AMRMClientUtils.getNextResponseId( + request.getResponseId()) == lastResponse.getResponseId()) { // heartbeat one step old, simply return lastReponse return lastResponse; } else if (request.getResponseId() != lastResponse.getResponseId()) { @@ -461,7 +455,8 @@ public class ApplicationMasterService extends AbstractService implements * need to worry about unregister call occurring in between (which * removes the lock object). */ - response.setResponseId(getNextResponseId(lastResponse.getResponseId())); + response.setResponseId( + AMRMClientUtils.getNextResponseId(lastResponse.getResponseId())); lock.setAllocateResponse(response); return response; } @@ -472,7 +467,7 @@ public class ApplicationMasterService extends AbstractService implements recordFactory.newRecordInstance(AllocateResponse.class); // set response id to -1 before application master for the following // attemptID get registered - response.setResponseId(PRE_REGISTER_RESPONSE_ID); + response.setResponseId(AMRMClientUtils.PRE_REGISTER_RESPONSE_ID); LOG.info("Registering app attempt : " + attemptId); responseMap.put(attemptId, new AllocateResponseLock(response)); rmContext.getNMTokenSecretManager().registerApplicationAttempt(attemptId);