YARN-8696. [AMRMProxy] FederationInterceptor upgrade: home sub-cluster heartbeat async. Contributed by Botong Huang.

This commit is contained in:
Giovanni Matteo Fumarola 2018-09-24 11:37:05 -07:00
parent 8de5c923b4
commit 3090922805
12 changed files with 626 additions and 302 deletions

View File

@ -3221,6 +3221,11 @@ public class YarnConfiguration extends Configuration {
"org.apache.hadoop.yarn.server.federation.resolver." "org.apache.hadoop.yarn.server.federation.resolver."
+ "DefaultSubClusterResolverImpl"; + "DefaultSubClusterResolverImpl";
// the maximum wait time for the first async heartbeat response
public static final String FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS =
FEDERATION_PREFIX + "amrmproxy.hb.maximum.wait.ms";
public static final long DEFAULT_FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS = 5000;
// AMRMProxy split-merge timeout for active sub-clusters. We will not route // AMRMProxy split-merge timeout for active sub-clusters. We will not route
// new asks to expired sub-clusters. // new asks to expired sub-clusters.
public static final String FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT = public static final String FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT =

View File

@ -105,6 +105,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
.add(YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER); .add(YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER);
configurationPropsToSkipCompare configurationPropsToSkipCompare
.add(YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS); .add(YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS);
configurationPropsToSkipCompare
.add(YarnConfiguration.FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS);
configurationPropsToSkipCompare configurationPropsToSkipCompare
.add(YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT); .add(YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT);

View File

@ -52,6 +52,8 @@ public final class AMRMClientUtils {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(AMRMClientUtils.class); LoggerFactory.getLogger(AMRMClientUtils.class);
public static final int PRE_REGISTER_RESPONSE_ID = -1;
public static final String APP_ALREADY_REGISTERED_MESSAGE = public static final String APP_ALREADY_REGISTERED_MESSAGE =
"Application Master is already registered : "; "Application Master is already registered : ";
@ -152,6 +154,11 @@ public final class AMRMClientUtils {
} }
} }
public static int getNextResponseId(int responseId) {
// Loop between 0 to Integer.MAX_VALUE
return (responseId + 1) & Integer.MAX_VALUE;
}
public static void addToOutstandingSchedulingRequests( public static void addToOutstandingSchedulingRequests(
Collection<SchedulingRequest> requests, Collection<SchedulingRequest> requests,
Map<Set<String>, List<SchedulingRequest>> outstandingSchedRequests) { Map<Set<String>, List<SchedulingRequest>> outstandingSchedRequests) {

View File

@ -47,6 +47,9 @@ public class AMHeartbeatRequestHandler extends Thread {
// Indication flag for the thread to keep running // Indication flag for the thread to keep running
private volatile boolean keepRunning; private volatile boolean keepRunning;
// For unit test draining
private volatile boolean isThreadWaiting;
private Configuration conf; private Configuration conf;
private ApplicationId applicationId; private ApplicationId applicationId;
@ -61,6 +64,7 @@ public class AMHeartbeatRequestHandler extends Thread {
this.setUncaughtExceptionHandler( this.setUncaughtExceptionHandler(
new HeartBeatThreadUncaughtExceptionHandler()); new HeartBeatThreadUncaughtExceptionHandler());
this.keepRunning = true; this.keepRunning = true;
this.isThreadWaiting = false;
this.conf = conf; this.conf = conf;
this.applicationId = applicationId; this.applicationId = applicationId;
@ -82,12 +86,15 @@ public class AMHeartbeatRequestHandler extends Thread {
while (keepRunning) { while (keepRunning) {
AsyncAllocateRequestInfo requestInfo; AsyncAllocateRequestInfo requestInfo;
try { try {
requestInfo = requestQueue.take(); this.isThreadWaiting = true;
requestInfo = this.requestQueue.take();
this.isThreadWaiting = false;
if (requestInfo == null) { if (requestInfo == null) {
throw new YarnException( throw new YarnException(
"Null requestInfo taken from request queue"); "Null requestInfo taken from request queue");
} }
if (!keepRunning) { if (!this.keepRunning) {
break; break;
} }
@ -98,7 +105,7 @@ public class AMHeartbeatRequestHandler extends Thread {
throw new YarnException("Null allocateRequest from requestInfo"); throw new YarnException("Null allocateRequest from requestInfo");
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Sending Heartbeat to Unmanaged AM. AskList:" LOG.debug("Sending Heartbeat to RM. AskList:"
+ ((request.getAskList() == null) ? " empty" + ((request.getAskList() == null) ? " empty"
: request.getAskList().size())); : request.getAskList().size()));
} }
@ -181,6 +188,16 @@ public class AMHeartbeatRequestHandler extends Thread {
} }
} }
@VisibleForTesting
public void drainHeartbeatThread() {
while (!this.isThreadWaiting || this.requestQueue.size() > 0) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
}
}
@VisibleForTesting @VisibleForTesting
public int getRequestQueueSize() { public int getRequestQueueSize() {
return this.requestQueue.size(); return this.requestQueue.size();

View File

@ -78,7 +78,7 @@ public class FederationRegistryClient {
* *
* @return the list of known applications * @return the list of known applications
*/ */
public List<String> getAllApplications() { public synchronized List<String> getAllApplications() {
// Suppress the exception here because it is valid that the entry does not // Suppress the exception here because it is valid that the entry does not
// exist // exist
List<String> applications = null; List<String> applications = null;
@ -99,7 +99,7 @@ public class FederationRegistryClient {
* For testing, delete all application records in registry. * For testing, delete all application records in registry.
*/ */
@VisibleForTesting @VisibleForTesting
public void cleanAllApplications() { public synchronized void cleanAllApplications() {
try { try {
removeKeyRegistry(this.registry, this.user, getRegistryKey(null, null), removeKeyRegistry(this.registry, this.user, getRegistryKey(null, null),
true, false); true, false);
@ -115,7 +115,7 @@ public class FederationRegistryClient {
* @param token the UAM of the application * @param token the UAM of the application
* @return whether the amrmToken is added or updated to a new value * @return whether the amrmToken is added or updated to a new value
*/ */
public boolean writeAMRMTokenForUAM(ApplicationId appId, public synchronized boolean writeAMRMTokenForUAM(ApplicationId appId,
String subClusterId, Token<AMRMTokenIdentifier> token) { String subClusterId, Token<AMRMTokenIdentifier> token) {
Map<String, Token<AMRMTokenIdentifier>> subClusterTokenMap = Map<String, Token<AMRMTokenIdentifier>> subClusterTokenMap =
this.appSubClusterTokenMap.get(appId); this.appSubClusterTokenMap.get(appId);
@ -154,7 +154,7 @@ public class FederationRegistryClient {
* @param appId application id * @param appId application id
* @return the sub-cluster to UAM token mapping * @return the sub-cluster to UAM token mapping
*/ */
public Map<String, Token<AMRMTokenIdentifier>> public synchronized Map<String, Token<AMRMTokenIdentifier>>
loadStateFromRegistry(ApplicationId appId) { loadStateFromRegistry(ApplicationId appId) {
Map<String, Token<AMRMTokenIdentifier>> retMap = new HashMap<>(); Map<String, Token<AMRMTokenIdentifier>> retMap = new HashMap<>();
// Suppress the exception here because it is valid that the entry does not // Suppress the exception here because it is valid that the entry does not
@ -203,7 +203,7 @@ public class FederationRegistryClient {
* *
* @param appId application id * @param appId application id
*/ */
public void removeAppFromRegistry(ApplicationId appId) { public synchronized void removeAppFromRegistry(ApplicationId appId) {
Map<String, Token<AMRMTokenIdentifier>> subClusterTokenMap = Map<String, Token<AMRMTokenIdentifier>> subClusterTokenMap =
this.appSubClusterTokenMap.get(appId); this.appSubClusterTokenMap.get(appId);
LOG.info("Removing all registry entries for {}", appId); LOG.info("Removing all registry entries for {}", appId);

View File

@ -407,4 +407,19 @@ public class UnmanagedAMPoolManager extends AbstractService {
return this.unmanagedAppMasterMap.get(uamId).getAMRMClientRelayer(); return this.unmanagedAppMasterMap.get(uamId).getAMRMClientRelayer();
} }
@VisibleForTesting
public int getRequestQueueSize(String uamId) throws YarnException {
if (!this.unmanagedAppMasterMap.containsKey(uamId)) {
throw new YarnException("UAM " + uamId + " does not exist");
}
return this.unmanagedAppMasterMap.get(uamId).getRequestQueueSize();
}
@VisibleForTesting
public void drainUAMHeartbeats() {
for (UnmanagedApplicationManager uam : this.unmanagedAppMasterMap
.values()) {
uam.drainHeartbeatThread();
}
}
} }

View File

@ -225,6 +225,10 @@ public class UnmanagedApplicationManager {
LOG.debug("RegisterUAM returned existing NM token for node " LOG.debug("RegisterUAM returned existing NM token for node "
+ nmToken.getNodeId()); + nmToken.getNodeId());
} }
LOG.info(
"RegisterUAM returned {} existing running container and {} NM tokens",
response.getContainersFromPreviousAttempts().size(),
response.getNMTokensFromPreviousAttempts().size());
// Only when register succeed that we start the heartbeat thread // Only when register succeed that we start the heartbeat thread
this.heartbeatHandler.setDaemon(true); this.heartbeatHandler.setDaemon(true);
@ -516,4 +520,14 @@ public class UnmanagedApplicationManager {
public int getRequestQueueSize() { public int getRequestQueueSize() {
return this.heartbeatHandler.getRequestQueueSize(); return this.heartbeatHandler.getRequestQueueSize();
} }
@VisibleForTesting
protected void setHandlerThread(AMHeartbeatRequestHandler thread) {
this.heartbeatHandler = thread;
}
@VisibleForTesting
protected void drainHeartbeatThread() {
this.heartbeatHandler.drainHeartbeatThread();
}
} }

View File

@ -189,8 +189,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
private HashSet<ApplicationId> applicationMap = new HashSet<>(); private HashSet<ApplicationId> applicationMap = new HashSet<>();
private HashSet<ApplicationId> keepContainerOnUams = new HashSet<>(); private HashSet<ApplicationId> keepContainerOnUams = new HashSet<>();
private HashMap<ApplicationAttemptId, private HashMap<ApplicationId, List<ContainerId>> applicationContainerIdMap =
List<ContainerId>> applicationContainerIdMap = new HashMap<>(); new HashMap<>();
private int rmId;
private AtomicInteger containerIndex = new AtomicInteger(0); private AtomicInteger containerIndex = new AtomicInteger(0);
private Configuration conf; private Configuration conf;
private int subClusterId; private int subClusterId;
@ -203,6 +204,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
private boolean shouldReRegisterNext = false; private boolean shouldReRegisterNext = false;
private boolean shouldWaitForSyncNextAllocate = false;
// For unit test synchronization // For unit test synchronization
private static Object syncObj = new Object(); private static Object syncObj = new Object();
@ -218,6 +221,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
public MockResourceManagerFacade(Configuration conf, int startContainerIndex, public MockResourceManagerFacade(Configuration conf, int startContainerIndex,
int subClusterId, boolean isRunning) { int subClusterId, boolean isRunning) {
this.conf = conf; this.conf = conf;
this.rmId = startContainerIndex;
this.containerIndex.set(startContainerIndex); this.containerIndex.set(startContainerIndex);
this.subClusterId = subClusterId; this.subClusterId = subClusterId;
this.isRunning = isRunning; this.isRunning = isRunning;
@ -259,17 +263,17 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
validateRunning(); validateRunning();
ApplicationAttemptId attemptId = getAppIdentifier(); ApplicationAttemptId attemptId = getAppIdentifier();
LOG.info("Registering application attempt: " + attemptId); LOG.info("Registering application attempt: " + attemptId);
ApplicationId appId = attemptId.getApplicationId();
List<Container> containersFromPreviousAttempt = null; List<Container> containersFromPreviousAttempt = null;
synchronized (applicationContainerIdMap) { synchronized (applicationContainerIdMap) {
if (applicationContainerIdMap.containsKey(attemptId)) { if (applicationContainerIdMap.containsKey(appId)) {
if (keepContainerOnUams.contains(attemptId.getApplicationId())) { if (keepContainerOnUams.contains(appId)) {
// For UAM with the keepContainersFromPreviousAttempt flag, return all // For UAM with the keepContainersFromPreviousAttempt flag, return all
// running containers // running containers
containersFromPreviousAttempt = new ArrayList<>(); containersFromPreviousAttempt = new ArrayList<>();
for (ContainerId containerId : applicationContainerIdMap for (ContainerId containerId : applicationContainerIdMap.get(appId)) {
.get(attemptId)) {
containersFromPreviousAttempt.add(Container.newInstance(containerId, containersFromPreviousAttempt.add(Container.newInstance(containerId,
null, null, null, null, null)); null, null, null, null, null));
} }
@ -279,7 +283,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
} }
} else { } else {
// Keep track of the containers that are returned to this application // Keep track of the containers that are returned to this application
applicationContainerIdMap.put(attemptId, new ArrayList<ContainerId>()); applicationContainerIdMap.put(appId, new ArrayList<ContainerId>());
} }
} }
@ -314,6 +318,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
ApplicationAttemptId attemptId = getAppIdentifier(); ApplicationAttemptId attemptId = getAppIdentifier();
LOG.info("Finishing application attempt: " + attemptId); LOG.info("Finishing application attempt: " + attemptId);
ApplicationId appId = attemptId.getApplicationId();
if (shouldReRegisterNext) { if (shouldReRegisterNext) {
String message = "AM is not registered, should re-register."; String message = "AM is not registered, should re-register.";
@ -324,8 +329,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
synchronized (applicationContainerIdMap) { synchronized (applicationContainerIdMap) {
// Remove the containers that were being tracked for this application // Remove the containers that were being tracked for this application
Assert.assertTrue("The application id is NOT registered: " + attemptId, Assert.assertTrue("The application id is NOT registered: " + attemptId,
applicationContainerIdMap.containsKey(attemptId)); applicationContainerIdMap.containsKey(appId));
applicationContainerIdMap.remove(attemptId); applicationContainerIdMap.remove(appId);
} }
return FinishApplicationMasterResponse.newInstance( return FinishApplicationMasterResponse.newInstance(
@ -350,6 +355,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
ApplicationAttemptId attemptId = getAppIdentifier(); ApplicationAttemptId attemptId = getAppIdentifier();
LOG.info("Allocate from application attempt: " + attemptId); LOG.info("Allocate from application attempt: " + attemptId);
ApplicationId appId = attemptId.getApplicationId();
if (shouldReRegisterNext) { if (shouldReRegisterNext) {
String message = "AM is not registered, should re-register."; String message = "AM is not registered, should re-register.";
@ -357,6 +363,21 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
throw new ApplicationMasterNotRegisteredException(message); throw new ApplicationMasterNotRegisteredException(message);
} }
// Wait for signal for certain test cases
synchronized (syncObj) {
if (shouldWaitForSyncNextAllocate) {
shouldWaitForSyncNextAllocate = false;
LOG.info("Allocate call in RM start waiting");
try {
syncObj.wait();
LOG.info("Allocate call in RM wait finished");
} catch (InterruptedException e) {
LOG.info("Allocate call in RM wait interrupted", e);
}
}
}
ArrayList<Container> containerList = new ArrayList<Container>(); ArrayList<Container> containerList = new ArrayList<Container>();
if (request.getAskList() != null) { if (request.getAskList() != null) {
for (ResourceRequest rr : request.getAskList()) { for (ResourceRequest rr : request.getAskList()) {
@ -381,9 +402,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
// will need it in future // will need it in future
Assert.assertTrue( Assert.assertTrue(
"The application id is Not registered before allocate(): " "The application id is Not registered before allocate(): "
+ attemptId, + appId,
applicationContainerIdMap.containsKey(attemptId)); applicationContainerIdMap.containsKey(appId));
List<ContainerId> ids = applicationContainerIdMap.get(attemptId); List<ContainerId> ids = applicationContainerIdMap.get(appId);
ids.add(containerId); ids.add(containerId);
} }
} }
@ -395,12 +416,10 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
&& request.getReleaseList().size() > 0) { && request.getReleaseList().size() > 0) {
LOG.info("Releasing containers: " + request.getReleaseList().size()); LOG.info("Releasing containers: " + request.getReleaseList().size());
synchronized (applicationContainerIdMap) { synchronized (applicationContainerIdMap) {
Assert Assert.assertTrue(
.assertTrue( "The application id is not registered before allocate(): " + appId,
"The application id is not registered before allocate(): " applicationContainerIdMap.containsKey(appId));
+ attemptId, List<ContainerId> ids = applicationContainerIdMap.get(appId);
applicationContainerIdMap.containsKey(attemptId));
List<ContainerId> ids = applicationContainerIdMap.get(attemptId);
for (ContainerId id : request.getReleaseList()) { for (ContainerId id : request.getReleaseList()) {
boolean found = false; boolean found = false;
@ -426,7 +445,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
+ " for application attempt: " + conf.get("AMRMTOKEN")); + " for application attempt: " + conf.get("AMRMTOKEN"));
// Always issue a new AMRMToken as if RM rolled master key // Always issue a new AMRMToken as if RM rolled master key
Token newAMRMToken = Token.newInstance(new byte[0], "", new byte[0], ""); Token newAMRMToken = Token.newInstance(new byte[0],
Integer.toString(this.rmId), new byte[0], "");
return AllocateResponse.newInstance(0, completedList, containerList, return AllocateResponse.newInstance(0, completedList, containerList,
new ArrayList<NodeReport>(), null, AMCommand.AM_RESYNC, 1, null, new ArrayList<NodeReport>(), null, AMCommand.AM_RESYNC, 1, null,
@ -434,6 +454,12 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
new ArrayList<UpdatedContainer>()); new ArrayList<UpdatedContainer>());
} }
public void setWaitForSyncNextAllocate(boolean wait) {
synchronized (syncObj) {
shouldWaitForSyncNextAllocate = wait;
}
}
@Override @Override
public GetApplicationReportResponse getApplicationReport( public GetApplicationReportResponse getApplicationReport(
GetApplicationReportRequest request) throws YarnException, IOException { GetApplicationReportRequest request) throws YarnException, IOException {
@ -624,14 +650,14 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
validateRunning(); validateRunning();
ApplicationAttemptId attemptId = request.getApplicationAttemptId(); ApplicationId appId = request.getApplicationAttemptId().getApplicationId();
List<ContainerReport> containers = new ArrayList<>(); List<ContainerReport> containers = new ArrayList<>();
synchronized (applicationContainerIdMap) { synchronized (applicationContainerIdMap) {
// Return the list of running containers that were being tracked for this // Return the list of running containers that were being tracked for this
// application // application
Assert.assertTrue("The application id is NOT registered: " + attemptId, Assert.assertTrue("The application id is NOT registered: " + appId,
applicationContainerIdMap.containsKey(attemptId)); applicationContainerIdMap.containsKey(appId));
List<ContainerId> ids = applicationContainerIdMap.get(attemptId); List<ContainerId> ids = applicationContainerIdMap.get(appId);
for (ContainerId c : ids) { for (ContainerId c : ids) {
containers.add(ContainerReport.newInstance(c, null, null, null, 0, 0, containers.add(ContainerReport.newInstance(c, null, null, null, 0, 0,
null, null, 0, null, null)); null, null, 0, null, null));

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
@ -62,14 +64,18 @@ import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.StrictPreemptionContract; import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.client.AMRMClientUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.AMHeartbeatRequestHandler;
import org.apache.hadoop.yarn.server.AMRMClientRelayer; import org.apache.hadoop.yarn.server.AMRMClientRelayer;
import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil; import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
@ -80,9 +86,9 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient; import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager; import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager;
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import org.apache.hadoop.yarn.util.AsyncCallback; import org.apache.hadoop.yarn.util.AsyncCallback;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -116,6 +122,17 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
NMSS_CLASS_PREFIX + "secondarySC/"; NMSS_CLASS_PREFIX + "secondarySC/";
public static final String STRING_TO_BYTE_FORMAT = "UTF-8"; public static final String STRING_TO_BYTE_FORMAT = "UTF-8";
private static final RecordFactory RECORD_FACTORY =
RecordFactoryProvider.getRecordFactory(null);
/**
* From AM's perspective, FederationInterceptor behaves exactly the same as
* YarnRM (ApplicationMasterService). This is to remember the last heart beat
* response, used to handle duplicate heart beat and responseId from AM.
*/
private AllocateResponse lastAllocateResponse;
private final Object lastAllocateResponseLock = new Object();
private ApplicationAttemptId attemptId; private ApplicationAttemptId attemptId;
/** /**
@ -124,7 +141,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
*/ */
private AMRMClientRelayer homeRMRelayer; private AMRMClientRelayer homeRMRelayer;
private SubClusterId homeSubClusterId; private SubClusterId homeSubClusterId;
private volatile int lastHomeResponseId; private AMHeartbeatRequestHandler homeHeartbeartHandler;
/** /**
* UAM pool for secondary sub-clusters (ones other than home sub-cluster), * UAM pool for secondary sub-clusters (ones other than home sub-cluster),
@ -146,7 +163,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
/** /**
* Stores the AllocateResponses that are received asynchronously from all the * Stores the AllocateResponses that are received asynchronously from all the
* sub-cluster resource managers except the home RM. * sub-cluster resource managers, including home RM.
*/ */
private Map<SubClusterId, List<AllocateResponse>> asyncResponseSink; private Map<SubClusterId, List<AllocateResponse>> asyncResponseSink;
@ -194,14 +211,13 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
/** The policy used to split requests among sub-clusters. */ /** The policy used to split requests among sub-clusters. */
private FederationAMRMProxyPolicy policyInterpreter; private FederationAMRMProxyPolicy policyInterpreter;
/**
* The proxy ugi used to talk to home RM, loaded with the up-to-date AMRMToken
* issued by home RM.
*/
private UserGroupInformation appOwner;
private FederationRegistryClient registryClient; private FederationRegistryClient registryClient;
// the maximum wait time for the first async heart beat response
private long heartbeatMaxWaitTimeMs;
private MonotonicClock clock = new MonotonicClock();
/** /**
* Creates an instance of the FederationInterceptor class. * Creates an instance of the FederationInterceptor class.
*/ */
@ -213,7 +229,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
this.secondaryRelayers = new ConcurrentHashMap<>(); this.secondaryRelayers = new ConcurrentHashMap<>();
this.amRegistrationRequest = null; this.amRegistrationRequest = null;
this.amRegistrationResponse = null; this.amRegistrationResponse = null;
this.lastHomeResponseId = Integer.MAX_VALUE;
this.justRecovered = false; this.justRecovered = false;
} }
@ -233,8 +248,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
setConf(conf); setConf(conf);
} }
// The proxy ugi used to talk to home RM as well as Yarn Registry, loaded
// with the up-to-date AMRMToken issued by home RM.
UserGroupInformation appOwner;
try { try {
this.appOwner = UserGroupInformation.createProxyUser(appContext.getUser(), appOwner = UserGroupInformation.createProxyUser(appContext.getUser(),
UserGroupInformation.getCurrentUser()); UserGroupInformation.getCurrentUser());
} catch (Exception ex) { } catch (Exception ex) {
throw new YarnRuntimeException(ex); throw new YarnRuntimeException(ex);
@ -242,10 +260,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
if (appContext.getRegistryClient() != null) { if (appContext.getRegistryClient() != null) {
this.registryClient = new FederationRegistryClient(conf, this.registryClient = new FederationRegistryClient(conf,
appContext.getRegistryClient(), this.appOwner); appContext.getRegistryClient(), appOwner);
// Add all app tokens for Yarn Registry access // Add all app tokens for Yarn Registry access
if (appContext.getCredentials() != null) { if (appContext.getCredentials() != null) {
this.appOwner.addCredentials(appContext.getCredentials()); appOwner.addCredentials(appContext.getCredentials());
} }
} }
@ -254,9 +272,21 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
this.homeSubClusterId = this.homeSubClusterId =
SubClusterId.newInstance(YarnConfiguration.getClusterId(conf)); SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
this.homeRMRelayer = new AMRMClientRelayer(createHomeRMProxy(appContext, this.homeRMRelayer = new AMRMClientRelayer(createHomeRMProxy(appContext,
ApplicationMasterProtocol.class, this.appOwner), appId, ApplicationMasterProtocol.class, appOwner), appId,
this.homeSubClusterId.toString()); this.homeSubClusterId.toString());
this.homeHeartbeartHandler = createHomeHeartbeartHandler(conf, appId);
this.homeHeartbeartHandler.setAMRMClientRelayer(this.homeRMRelayer);
this.homeHeartbeartHandler.setUGI(appOwner);
this.homeHeartbeartHandler.setDaemon(true);
this.homeHeartbeartHandler.start();
// set lastResponseId to -1 before application master registers
this.lastAllocateResponse =
RECORD_FACTORY.newRecordInstance(AllocateResponse.class);
this.lastAllocateResponse
.setResponseId(AMRMClientUtils.PRE_REGISTER_RESPONSE_ID);
this.federationFacade = FederationStateStoreFacade.getInstance(); this.federationFacade = FederationStateStoreFacade.getInstance();
this.subClusterResolver = this.federationFacade.getSubClusterResolver(); this.subClusterResolver = this.federationFacade.getSubClusterResolver();
@ -265,6 +295,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
this.uamPool.init(conf); this.uamPool.init(conf);
this.uamPool.start(); this.uamPool.start();
this.heartbeatMaxWaitTimeMs =
conf.getLong(YarnConfiguration.FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS,
YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS);
} }
@Override @Override
@ -272,6 +306,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
super.recover(recoveredDataMap); super.recover(recoveredDataMap);
LOG.info("Recovering data for FederationInterceptor for {}", LOG.info("Recovering data for FederationInterceptor for {}",
this.attemptId); this.attemptId);
this.justRecovered = true;
if (recoveredDataMap == null) { if (recoveredDataMap == null) {
return; return;
} }
@ -294,9 +330,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
this.amRegistrationResponse = this.amRegistrationResponse =
new RegisterApplicationMasterResponsePBImpl(pb); new RegisterApplicationMasterResponsePBImpl(pb);
LOG.info("amRegistrationResponse recovered for {}", this.attemptId); LOG.info("amRegistrationResponse recovered for {}", this.attemptId);
// Trigger re-register and full pending re-send only if we have a
// saved register response. This should always be true though.
this.justRecovered = true;
} }
// Recover UAM amrmTokens from registry or NMSS // Recover UAM amrmTokens from registry or NMSS
@ -355,6 +388,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
.getContainersFromPreviousAttempts()) { .getContainersFromPreviousAttempts()) {
containerIdToSubClusterIdMap.put(container.getId(), subClusterId); containerIdToSubClusterIdMap.put(container.getId(), subClusterId);
containers++; containers++;
LOG.debug(" From subcluster " + subClusterId
+ " running container " + container.getId());
} }
LOG.info("Recovered {} running containers from UAM in {}", LOG.info("Recovered {} running containers from UAM in {}",
response.getContainersFromPreviousAttempts().size(), response.getContainersFromPreviousAttempts().size(),
@ -384,7 +419,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
LOG.debug(" From home RM " + this.homeSubClusterId LOG.debug(" From home RM " + this.homeSubClusterId
+ " running container " + container.getContainerId()); + " running container " + container.getContainerId());
} }
LOG.info("{} running containers including AM recovered from home RM ", LOG.info("{} running containers including AM recovered from home RM {}",
response.getContainerList().size(), this.homeSubClusterId); response.getContainerList().size(), this.homeSubClusterId);
LOG.info( LOG.info(
@ -411,8 +446,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
* so that when AM registers more than once, it returns the same register * so that when AM registers more than once, it returns the same register
* success response instead of throwing * success response instead of throwing
* {@link InvalidApplicationMasterRequestException}. Furthermore, we present * {@link InvalidApplicationMasterRequestException}. Furthermore, we present
* to AM as if we are the RM that never fails over. When actual RM fails over, * to AM as if we are the RM that never fails over (except when AMRMProxy
* we always re-register automatically. * restarts). When actual RM fails over, we always re-register automatically.
* *
* We did this because FederationInterceptor can receive concurrent register * We did this because FederationInterceptor can receive concurrent register
* requests from AM because of timeout between AM and AMRMProxy, which is * requests from AM because of timeout between AM and AMRMProxy, which is
@ -425,6 +460,13 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
public synchronized RegisterApplicationMasterResponse public synchronized RegisterApplicationMasterResponse
registerApplicationMaster(RegisterApplicationMasterRequest request) registerApplicationMaster(RegisterApplicationMasterRequest request)
throws YarnException, IOException { throws YarnException, IOException {
// Reset the heartbeat responseId to zero upon register
synchronized (this.lastAllocateResponseLock) {
this.lastAllocateResponse.setResponseId(0);
}
this.justRecovered = false;
// If AM is calling with a different request, complain // If AM is calling with a different request, complain
if (this.amRegistrationRequest != null) { if (this.amRegistrationRequest != null) {
if (!this.amRegistrationRequest.equals(request)) { if (!this.amRegistrationRequest.equals(request)) {
@ -524,34 +566,34 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
*/ */
@Override @Override
public AllocateResponse allocate(AllocateRequest request) public AllocateResponse allocate(AllocateRequest request)
throws YarnException { throws YarnException, IOException {
Preconditions.checkArgument(this.policyInterpreter != null, Preconditions.checkArgument(this.policyInterpreter != null,
"Allocate should be called after registerApplicationMaster"); "Allocate should be called after registerApplicationMaster");
if (this.justRecovered && this.lastHomeResponseId == Integer.MAX_VALUE) { if (this.justRecovered) {
// Save the responseId home RM is expecting
this.lastHomeResponseId = request.getResponseId();
throw new ApplicationMasterNotRegisteredException( throw new ApplicationMasterNotRegisteredException(
"AMRMProxy just restarted and recovered for " + this.attemptId "AMRMProxy just restarted and recovered for " + this.attemptId
+ ". AM should re-register and full re-send pending requests."); + ". AM should re-register and full re-send pending requests.");
} }
// Override responseId in the request in two cases: // Check responseId and handle duplicate heartbeat exactly same as RM
// synchronized (this.lastAllocateResponseLock) {
// 1. After we just recovered after an NM restart and AM's responseId is LOG.info("Heartbeat from " + this.attemptId + " with responseId "
// reset due to the exception we generate. We need to override the + request.getResponseId() + " when we are expecting "
// responseId to the one homeRM expects. + this.lastAllocateResponse.getResponseId());
// // Normally request.getResponseId() == lastResponse.getResponseId()
// 2. After homeRM fail-over, the allocate response with reseted responseId if (AMRMClientUtils.getNextResponseId(
// might not be returned successfully back to AM because of RPC connection request.getResponseId()) == this.lastAllocateResponse
// timeout between AM and AMRMProxy. In this case, we remember and reset the .getResponseId()) {
// responseId for AM. // heartbeat one step old, simply return lastReponse
if (this.justRecovered return this.lastAllocateResponse;
|| request.getResponseId() > this.lastHomeResponseId) { } else if (request.getResponseId() != this.lastAllocateResponse
LOG.warn("Setting allocate responseId for {} from {} to {}", .getResponseId()) {
this.attemptId, request.getResponseId(), this.lastHomeResponseId); throw new InvalidApplicationMasterRequestException(
request.setResponseId(this.lastHomeResponseId); AMRMClientUtils.assembleInvalidResponseIdExceptionMessage(attemptId,
this.lastAllocateResponse.getResponseId(),
request.getResponseId()));
}
} }
try { try {
@ -560,71 +602,55 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
Map<SubClusterId, AllocateRequest> requests = Map<SubClusterId, AllocateRequest> requests =
splitAllocateRequest(request); splitAllocateRequest(request);
// Send the requests to the secondary sub-cluster resource managers. /**
// These secondary requests are send asynchronously and the responses will * Send the requests to the all sub-cluster resource managers. All
// be collected and merged with the home response. In addition, it also * requests are synchronously triggered but sent asynchronously. Later the
// return the newly registered Unmanaged AMs. * responses will be collected and merged. In addition, it also returns
Registrations newRegistrations = * the newly registered UAMs.
sendRequestsToSecondaryResourceManagers(requests); */
Registrations newRegistrations = sendRequestsToResourceManagers(requests);
// Send the request to the home RM and get the response // Wait for the first async response to arrive
AllocateRequest homeRequest = requests.get(this.homeSubClusterId); long startTime = this.clock.getTime();
LOG.info("{} heartbeating to home RM with responseId {}", this.attemptId, synchronized (this.asyncResponseSink) {
homeRequest.getResponseId()); try {
this.asyncResponseSink.wait(this.heartbeatMaxWaitTimeMs);
AllocateResponse homeResponse = this.homeRMRelayer.allocate(homeRequest); } catch (InterruptedException e) {
}
// Reset the flag after the first successful homeRM allocate response,
// otherwise keep overriding the responseId of new allocate request
if (this.justRecovered) {
this.justRecovered = false;
} }
long firstResponseTime = this.clock.getTime() - startTime;
// Notify policy of home response // An extra brief wait for other async heart beats, so that most of their
// responses can make it back to AM in the same heart beat round trip.
try { try {
this.policyInterpreter.notifyOfResponse(this.homeSubClusterId, Thread.sleep(firstResponseTime);
homeResponse); } catch (InterruptedException e) {
} catch (YarnException e) {
LOG.warn("notifyOfResponse for policy failed for home sub-cluster "
+ this.homeSubClusterId, e);
} }
// If the resource manager sent us a new token, add to the current user // Prepare the response to AM
if (homeResponse.getAMRMToken() != null) { AllocateResponse response =
LOG.debug("Received new AMRMToken"); RECORD_FACTORY.newRecordInstance(AllocateResponse.class);
YarnServerSecurityUtils.updateAMRMToken(homeResponse.getAMRMToken(),
this.appOwner, getConf());
}
// Merge the responses from home and secondary sub-cluster RMs // Merge all responses from response sink
homeResponse = mergeAllocateResponses(homeResponse); mergeAllocateResponses(response);
// Merge the containers and NMTokens from the new registrations into // Merge the containers and NMTokens from the new registrations into
// the homeResponse. // the response
if (!isNullOrEmpty(newRegistrations.getSuccessfulRegistrations())) { if (!isNullOrEmpty(newRegistrations.getSuccessfulRegistrations())) {
homeResponse = mergeRegistrationResponses(homeResponse, mergeRegistrationResponses(response,
newRegistrations.getSuccessfulRegistrations()); newRegistrations.getSuccessfulRegistrations());
} }
LOG.info("{} heartbeat response from home RM with responseId {}", // update the responseId and return the final response to AM
this.attemptId, homeResponse.getResponseId()); synchronized (this.lastAllocateResponseLock) {
response.setResponseId(AMRMClientUtils
// Update lastHomeResponseId in three cases: .getNextResponseId(this.lastAllocateResponse.getResponseId()));
// 1. The normal responseId increments this.lastAllocateResponse = response;
// 2. homeResponse.getResponseId() == 1. This happens when homeRM fails
// over, AMRMClientRelayer auto re-register and full re-send for homeRM.
// 3. lastHomeResponseId == MAX_INT. This is the initial case or
// responseId about to overflow and wrap around
if (homeResponse.getResponseId() == this.lastHomeResponseId + 1
|| homeResponse.getResponseId() == 1
|| this.lastHomeResponseId == Integer.MAX_VALUE) {
this.lastHomeResponseId = homeResponse.getResponseId();
} }
return response;
// return the final response to the application master. } catch (Throwable ex) {
return homeResponse; LOG.error("Exception encountered while processing heart beat for "
} catch (IOException ex) { + this.attemptId, ex);
LOG.error("Exception encountered while processing heart beat", ex);
throw new YarnException(ex); throw new YarnException(ex);
} }
} }
@ -696,6 +722,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
FinishApplicationMasterResponse homeResponse = FinishApplicationMasterResponse homeResponse =
this.homeRMRelayer.finishApplicationMaster(request); this.homeRMRelayer.finishApplicationMaster(request);
// Stop the home heartbeat thread
this.homeHeartbeartHandler.shutdown();
if (subClusterIds.size() > 0) { if (subClusterIds.size() > 0) {
// Wait for other sub-cluster resource managers to return the // Wait for other sub-cluster resource managers to return the
// response and merge it with the home response // response and merge it with the home response
@ -758,10 +787,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
} }
this.threadpool = null; this.threadpool = null;
} }
homeRMRelayer.shutdown();
for(AMRMClientRelayer relayer : secondaryRelayers.values()){ // Stop the home heartbeat thread
this.homeHeartbeartHandler.shutdown();
this.homeRMRelayer.shutdown();
for (AMRMClientRelayer relayer : this.secondaryRelayers.values()) {
relayer.shutdown(); relayer.shutdown();
} }
super.shutdown(); super.shutdown();
} }
@ -781,8 +814,13 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
} }
@VisibleForTesting @VisibleForTesting
protected int getLastHomeResponseId() { protected ApplicationAttemptId getAttemptId() {
return this.lastHomeResponseId; return this.attemptId;
}
@VisibleForTesting
protected AMHeartbeatRequestHandler getHomeHeartbeartHandler() {
return this.homeHeartbeartHandler;
} }
/** /**
@ -798,6 +836,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
return new UnmanagedAMPoolManager(threadPool); return new UnmanagedAMPoolManager(threadPool);
} }
@VisibleForTesting
protected AMHeartbeatRequestHandler createHomeHeartbeartHandler(
Configuration conf, ApplicationId appId) {
return new AMHeartbeatRequestHandler(conf, appId);
}
/** /**
* Create a proxy instance that is used to connect to the Home resource * Create a proxy instance that is used to connect to the Home resource
* manager. * manager.
@ -872,7 +916,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
+ "Reattaching in parallel", uamMap.size(), appId); + "Reattaching in parallel", uamMap.size(), appId);
ExecutorCompletionService<RegisterApplicationMasterResponse> ExecutorCompletionService<RegisterApplicationMasterResponse>
completionService = new ExecutorCompletionService<>(threadpool); completionService = new ExecutorCompletionService<>(this.threadpool);
for (Entry<String, Token<AMRMTokenIdentifier>> entry : uamMap.entrySet()) { for (Entry<String, Token<AMRMTokenIdentifier>> entry : uamMap.entrySet()) {
final SubClusterId subClusterId = final SubClusterId subClusterId =
@ -1047,16 +1091,16 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
/** /**
* This methods sends the specified AllocateRequests to the appropriate * This methods sends the specified AllocateRequests to the appropriate
* sub-cluster resource managers. * sub-cluster resource managers asynchronously.
* *
* @param requests contains the heart beat requests to send to the resource * @param requests contains the heart beat requests to send to the resource
* manager keyed by the resource manager address * manager keyed by the sub-cluster id
* @return the registration responses from the newly added sub-cluster * @return the registration responses from the newly added sub-cluster
* resource managers * resource managers
* @throws YarnException * @throws YarnException
* @throws IOException * @throws IOException
*/ */
private Registrations sendRequestsToSecondaryResourceManagers( private Registrations sendRequestsToResourceManagers(
Map<SubClusterId, AllocateRequest> requests) Map<SubClusterId, AllocateRequest> requests)
throws YarnException, IOException { throws YarnException, IOException {
@ -1065,32 +1109,29 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
Registrations registrations = registerWithNewSubClusters(requests.keySet()); Registrations registrations = registerWithNewSubClusters(requests.keySet());
// Now that all the registrations are done, send the allocation request // Now that all the registrations are done, send the allocation request
// to the sub-cluster RMs using the Unmanaged application masters // to the sub-cluster RMs asynchronously and don't wait for the response.
// asynchronously and don't wait for the response. The responses will // The responses will arrive asynchronously and will be added to the
// arrive asynchronously and will be added to the response sink. These // response sink, then merged and sent to the application master.
// responses will be sent to the application master in some future heart
// beat response.
for (Entry<SubClusterId, AllocateRequest> entry : requests.entrySet()) { for (Entry<SubClusterId, AllocateRequest> entry : requests.entrySet()) {
final SubClusterId subClusterId = entry.getKey(); SubClusterId subClusterId = entry.getKey();
if (subClusterId.equals(this.homeSubClusterId)) { if (subClusterId.equals(this.homeSubClusterId)) {
// Skip the request for the home sub-cluster resource manager. // Request for the home sub-cluster resource manager
// It will be handled separately in the allocate() method this.homeHeartbeartHandler.allocateAsync(entry.getValue(),
continue; new HeartbeatCallBack(this.homeSubClusterId, false));
} else {
if (!this.uamPool.hasUAMId(subClusterId.getId())) {
// TODO: This means that the registration for this sub-cluster RM
// failed. For now, we ignore the resource requests and continue
// but we need to fix this and handle this situation. One way would
// be to send the request to another RM by consulting the policy.
LOG.warn("Unmanaged AM registration not found for sub-cluster {}",
subClusterId);
continue;
}
this.uamPool.allocateAsync(subClusterId.getId(), entry.getValue(),
new HeartbeatCallBack(subClusterId, true));
} }
if (!this.uamPool.hasUAMId(subClusterId.getId())) {
// TODO: This means that the registration for this sub-cluster RM
// failed. For now, we ignore the resource requests and continue
// but we need to fix this and handle this situation. One way would
// be to send the request to another RM by consulting the policy.
LOG.warn("Unmanaged AM registration not found for sub-cluster {}",
subClusterId);
continue;
}
this.uamPool.allocateAsync(subClusterId.getId(), entry.getValue(),
new HeartbeatCallBack(subClusterId));
} }
return registrations; return registrations;
@ -1123,7 +1164,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
this.amRegistrationRequest; this.amRegistrationRequest;
final AMRMProxyApplicationContext appContext = getApplicationContext(); final AMRMProxyApplicationContext appContext = getApplicationContext();
ExecutorCompletionService<RegisterApplicationMasterResponseInfo> ExecutorCompletionService<RegisterApplicationMasterResponseInfo>
completionService = new ExecutorCompletionService<>(threadpool); completionService = new ExecutorCompletionService<>(this.threadpool);
for (final String subClusterId : newSubClusters) { for (final String subClusterId : newSubClusters) {
completionService completionService
@ -1208,21 +1249,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
} }
/** /**
* Merges the responses from other sub-clusters that we received * Merge the responses from all sub-clusters that we received asynchronously
* asynchronously with the specified home cluster response and keeps track of * and keeps track of the containers received from each sub-cluster resource
* the containers received from each sub-cluster resource managers. * managers.
*/ */
private AllocateResponse mergeAllocateResponses( private void mergeAllocateResponses(AllocateResponse mergedResponse) {
AllocateResponse homeResponse) {
// Timing issue, we need to remove the completed and then save the new ones.
removeFinishedContainersFromCache(
homeResponse.getCompletedContainersStatuses());
cacheAllocatedContainers(homeResponse.getAllocatedContainers(),
this.homeSubClusterId);
synchronized (this.asyncResponseSink) { synchronized (this.asyncResponseSink) {
for (Entry<SubClusterId, List<AllocateResponse>> entry : asyncResponseSink for (Entry<SubClusterId, List<AllocateResponse>> entry :
.entrySet()) { this.asyncResponseSink.entrySet()) {
SubClusterId subClusterId = entry.getKey(); SubClusterId subClusterId = entry.getKey();
List<AllocateResponse> responses = entry.getValue(); List<AllocateResponse> responses = entry.getValue();
if (responses.size() > 0) { if (responses.size() > 0) {
@ -1231,14 +1265,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
response.getCompletedContainersStatuses()); response.getCompletedContainersStatuses());
cacheAllocatedContainers(response.getAllocatedContainers(), cacheAllocatedContainers(response.getAllocatedContainers(),
subClusterId); subClusterId);
mergeAllocateResponse(homeResponse, response, subClusterId); mergeAllocateResponse(mergedResponse, response, subClusterId);
} }
responses.clear(); responses.clear();
} }
} }
} }
return homeResponse;
} }
/** /**
@ -1256,11 +1288,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
} }
/** /**
* Helper method for merging the responses from the secondary sub cluster RMs * Helper method for merging the registration responses from the secondary sub
* with the home response to return to the AM. * cluster RMs into the allocate response to return to the AM.
*/ */
private AllocateResponse mergeRegistrationResponses( private void mergeRegistrationResponses(AllocateResponse homeResponse,
AllocateResponse homeResponse,
Map<SubClusterId, RegisterApplicationMasterResponse> registrations) { Map<SubClusterId, RegisterApplicationMasterResponse> registrations) {
for (Entry<SubClusterId, RegisterApplicationMasterResponse> entry : for (Entry<SubClusterId, RegisterApplicationMasterResponse> entry :
@ -1292,13 +1323,22 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
} }
} }
} }
return homeResponse;
} }
private void mergeAllocateResponse(AllocateResponse homeResponse, private void mergeAllocateResponse(AllocateResponse homeResponse,
AllocateResponse otherResponse, SubClusterId otherRMAddress) { AllocateResponse otherResponse, SubClusterId otherRMAddress) {
if (otherResponse.getAMRMToken() != null) {
// Propagate only the new amrmToken from home sub-cluster back to
// AMRMProxyService
if (otherRMAddress.equals(this.homeSubClusterId)) {
homeResponse.setAMRMToken(otherResponse.getAMRMToken());
} else {
throw new YarnRuntimeException(
"amrmToken from UAM " + otherRMAddress + " should be null here");
}
}
if (!isNullOrEmpty(otherResponse.getAllocatedContainers())) { if (!isNullOrEmpty(otherResponse.getAllocatedContainers())) {
if (!isNullOrEmpty(homeResponse.getAllocatedContainers())) { if (!isNullOrEmpty(homeResponse.getAllocatedContainers())) {
homeResponse.getAllocatedContainers() homeResponse.getAllocatedContainers()
@ -1406,9 +1446,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
SubClusterId subClusterId) { SubClusterId subClusterId) {
for (Container container : containers) { for (Container container : containers) {
LOG.debug("Adding container {}", container); LOG.debug("Adding container {}", container);
if (containerIdToSubClusterIdMap.containsKey(container.getId())) {
if (this.containerIdToSubClusterIdMap.containsKey(container.getId())) {
SubClusterId existingSubClusterId = SubClusterId existingSubClusterId =
containerIdToSubClusterIdMap.get(container.getId()); this.containerIdToSubClusterIdMap.get(container.getId());
if (existingSubClusterId.equals(subClusterId)) { if (existingSubClusterId.equals(subClusterId)) {
/* /*
* When RM fails over, the new RM master might send out the same * When RM fails over, the new RM master might send out the same
@ -1441,7 +1482,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
} }
} }
containerIdToSubClusterIdMap.put(container.getId(), subClusterId); this.containerIdToSubClusterIdMap.put(container.getId(), subClusterId);
} }
} }
@ -1463,7 +1504,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
newRequest.setProgress(originalAMRequest.getProgress()); newRequest.setProgress(originalAMRequest.getProgress());
requestMap.put(subClusterId, newRequest); requestMap.put(subClusterId, newRequest);
} }
return newRequest; return newRequest;
} }
@ -1472,7 +1512,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
*/ */
private static AllocateRequest createAllocateRequest() { private static AllocateRequest createAllocateRequest() {
AllocateRequest request = AllocateRequest request =
AllocateRequest.newInstance(0, 0, null, null, null); RECORD_FACTORY.newRecordInstance(AllocateRequest.class);
request.setAskList(new ArrayList<ResourceRequest>()); request.setAskList(new ArrayList<ResourceRequest>());
request.setReleaseList(new ArrayList<ContainerId>()); request.setReleaseList(new ArrayList<ContainerId>());
ResourceBlacklistRequest blackList = ResourceBlacklistRequest blackList =
@ -1525,6 +1565,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
return this.uamPool.getAllUAMIds().size(); return this.uamPool.getAllUAMIds().size();
} }
@VisibleForTesting
protected UnmanagedAMPoolManager getUnmanagedAMPool() {
return this.uamPool;
}
@VisibleForTesting @VisibleForTesting
public Map<SubClusterId, List<AllocateResponse>> getAsyncResponseSink() { public Map<SubClusterId, List<AllocateResponse>> getAsyncResponseSink() {
return this.asyncResponseSink; return this.asyncResponseSink;
@ -1535,9 +1580,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
*/ */
private class HeartbeatCallBack implements AsyncCallback<AllocateResponse> { private class HeartbeatCallBack implements AsyncCallback<AllocateResponse> {
private SubClusterId subClusterId; private SubClusterId subClusterId;
private boolean isUAM;
HeartbeatCallBack(SubClusterId subClusterId) { HeartbeatCallBack(SubClusterId subClusterId, boolean isUAM) {
this.subClusterId = subClusterId; this.subClusterId = subClusterId;
this.isUAM = isUAM;
} }
@Override @Override
@ -1551,16 +1598,33 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
asyncResponseSink.put(subClusterId, responses); asyncResponseSink.put(subClusterId, responses);
} }
responses.add(response); responses.add(response);
// Notify main thread about the response arrival
asyncResponseSink.notifyAll();
} }
// Save the new AMRMToken for the UAM if present // Save the new AMRMToken for the UAM if present
if (response.getAMRMToken() != null) { if (this.isUAM && response.getAMRMToken() != null) {
Token<AMRMTokenIdentifier> newToken = ConverterUtils Token<AMRMTokenIdentifier> newToken = ConverterUtils
.convertFromYarn(response.getAMRMToken(), (Text) null); .convertFromYarn(response.getAMRMToken(), (Text) null);
// Do not further propagate the new amrmToken for UAM
response.setAMRMToken(null);
// Update the token in registry or NMSS // Update the token in registry or NMSS
if (registryClient != null) { if (registryClient != null) {
registryClient.writeAMRMTokenForUAM(attemptId.getApplicationId(), if (registryClient.writeAMRMTokenForUAM(attemptId.getApplicationId(),
subClusterId.getId(), newToken); subClusterId.getId(), newToken)) {
try {
AMRMTokenIdentifier identifier = new AMRMTokenIdentifier();
identifier.readFields(new DataInputStream(
new ByteArrayInputStream(newToken.getIdentifier())));
LOG.info(
"Received new UAM amrmToken with keyId {} and "
+ "service {} from {} for {}, written to Registry",
identifier.getKeyId(), newToken.getService(), subClusterId,
attemptId);
} catch (IOException e) {
}
}
} else if (getNMStateStore() != null) { } else if (getNMStateStore() != null) {
try { try {
getNMStateStore().storeAMRMProxyAppContextEntry(attemptId, getNMStateStore().storeAMRMProxyAppContextEntry(attemptId,
@ -1573,11 +1637,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
} }
} }
// Notify policy of secondary sub-cluster responses // Notify policy of allocate response
try { try {
policyInterpreter.notifyOfResponse(subClusterId, response); policyInterpreter.notifyOfResponse(subClusterId, response);
} catch (YarnException e) { } catch (YarnException e) {
LOG.warn("notifyOfResponse for policy failed for home sub-cluster " LOG.warn("notifyOfResponse for policy failed for sub-cluster "
+ subClusterId, e); + subClusterId, e);
} }
} }

View File

@ -33,6 +33,7 @@ import java.util.concurrent.Executors;
import org.apache.hadoop.registry.client.api.RegistryOperations; import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService; import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@ -49,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.PreemptionMessage; import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.UpdateContainerError; import org.apache.hadoop.yarn.api.records.UpdateContainerError;
import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -95,6 +97,8 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
private int testAppId; private int testAppId;
private ApplicationAttemptId attemptId; private ApplicationAttemptId attemptId;
private volatile int lastResponseId;
@Override @Override
public void setUp() throws IOException { public void setUp() throws IOException {
super.setUp(); super.setUp();
@ -120,6 +124,8 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, getConf(), interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, getConf(),
attemptId, "test-user", null, null, null, registry)); attemptId, "test-user", null, null, null, registry));
interceptor.cleanupRegistry(); interceptor.cleanupRegistry();
lastResponseId = 0;
} }
@Override @Override
@ -174,8 +180,6 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
private List<Container> getContainersAndAssert(int numberOfResourceRequests, private List<Container> getContainersAndAssert(int numberOfResourceRequests,
int numberOfAllocationExcepted) throws Exception { int numberOfAllocationExcepted) throws Exception {
AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
allocateRequest.setResponseId(1);
List<Container> containers = List<Container> containers =
new ArrayList<Container>(numberOfResourceRequests); new ArrayList<Container>(numberOfResourceRequests);
List<ResourceRequest> askList = List<ResourceRequest> askList =
@ -187,22 +191,31 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
allocateRequest.setAskList(askList); allocateRequest.setAskList(askList);
allocateRequest.setResponseId(lastResponseId);
AllocateResponse allocateResponse = interceptor.allocate(allocateRequest); AllocateResponse allocateResponse = interceptor.allocate(allocateRequest);
Assert.assertNotNull("allocate() returned null response", allocateResponse); Assert.assertNotNull("allocate() returned null response", allocateResponse);
checkAMRMToken(allocateResponse.getAMRMToken());
lastResponseId = allocateResponse.getResponseId();
containers.addAll(allocateResponse.getAllocatedContainers()); containers.addAll(allocateResponse.getAllocatedContainers());
LOG.info("Number of allocated containers in the original request: " LOG.info("Number of allocated containers in the original request: "
+ Integer.toString(allocateResponse.getAllocatedContainers().size())); + Integer.toString(allocateResponse.getAllocatedContainers().size()));
// Make sure this request is picked up by all async heartbeat handlers
interceptor.drainAllAsyncQueue(false);
// Send max 10 heart beats to receive all the containers. If not, we will // Send max 10 heart beats to receive all the containers. If not, we will
// fail the test // fail the test
int numHeartbeat = 0; int numHeartbeat = 0;
while (containers.size() < numberOfAllocationExcepted while (containers.size() < numberOfAllocationExcepted
&& numHeartbeat++ < 10) { && numHeartbeat++ < 10) {
allocateResponse = allocateRequest = Records.newRecord(AllocateRequest.class);
interceptor.allocate(Records.newRecord(AllocateRequest.class)); allocateRequest.setResponseId(lastResponseId);
allocateResponse = interceptor.allocate(allocateRequest);
Assert.assertNotNull("allocate() returned null response", Assert.assertNotNull("allocate() returned null response",
allocateResponse); allocateResponse);
checkAMRMToken(allocateResponse.getAMRMToken());
lastResponseId = allocateResponse.getResponseId();
containers.addAll(allocateResponse.getAllocatedContainers()); containers.addAll(allocateResponse.getAllocatedContainers());
@ -220,8 +233,6 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
throws Exception { throws Exception {
Assert.assertTrue(containers.size() > 0); Assert.assertTrue(containers.size() > 0);
AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
allocateRequest.setResponseId(1);
List<ContainerId> relList = new ArrayList<ContainerId>(containers.size()); List<ContainerId> relList = new ArrayList<ContainerId>(containers.size());
for (Container container : containers) { for (Container container : containers) {
relList.add(container.getId()); relList.add(container.getId());
@ -229,8 +240,11 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
allocateRequest.setReleaseList(relList); allocateRequest.setReleaseList(relList);
allocateRequest.setResponseId(lastResponseId);
AllocateResponse allocateResponse = interceptor.allocate(allocateRequest); AllocateResponse allocateResponse = interceptor.allocate(allocateRequest);
Assert.assertNotNull(allocateResponse); Assert.assertNotNull(allocateResponse);
checkAMRMToken(allocateResponse.getAMRMToken());
lastResponseId = allocateResponse.getResponseId();
// The release request will be split and handled by the corresponding UAM. // The release request will be split and handled by the corresponding UAM.
// The release containers returned by the mock resource managers will be // The release containers returned by the mock resource managers will be
@ -244,14 +258,21 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
LOG.info("Number of containers received in the original request: " LOG.info("Number of containers received in the original request: "
+ Integer.toString(newlyFinished.size())); + Integer.toString(newlyFinished.size()));
// Make sure this request is picked up by all async heartbeat handlers
interceptor.drainAllAsyncQueue(false);
// Send max 10 heart beats to receive all the containers. If not, we will // Send max 10 heart beats to receive all the containers. If not, we will
// fail the test // fail the test
int numHeartbeat = 0; int numHeartbeat = 0;
while (containersForReleasedContainerIds.size() < relList.size() while (containersForReleasedContainerIds.size() < relList.size()
&& numHeartbeat++ < 10) { && numHeartbeat++ < 10) {
allocateResponse = allocateRequest = Records.newRecord(AllocateRequest.class);
interceptor.allocate(Records.newRecord(AllocateRequest.class)); allocateRequest.setResponseId(lastResponseId);
allocateResponse = interceptor.allocate(allocateRequest);
Assert.assertNotNull(allocateResponse); Assert.assertNotNull(allocateResponse);
checkAMRMToken(allocateResponse.getAMRMToken());
lastResponseId = allocateResponse.getResponseId();
newlyFinished = getCompletedContainerIds( newlyFinished = getCompletedContainerIds(
allocateResponse.getCompletedContainersStatuses()); allocateResponse.getCompletedContainersStatuses());
containersForReleasedContainerIds.addAll(newlyFinished); containersForReleasedContainerIds.addAll(newlyFinished);
@ -267,65 +288,81 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
containersForReleasedContainerIds.size()); containersForReleasedContainerIds.size());
} }
private void checkAMRMToken(Token amrmToken) {
if (amrmToken != null) {
// The token should be the one issued by home MockRM
Assert.assertTrue(amrmToken.getKind().equals(Integer.toString(0)));
}
}
@Test @Test
public void testMultipleSubClusters() throws Exception { public void testMultipleSubClusters() throws Exception {
UserGroupInformation ugi =
interceptor.getUGIWithToken(interceptor.getAttemptId());
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
// Register the application
RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
registerReq.setRpcPort(0);
registerReq.setTrackingUrl("");
// Register the application RegisterApplicationMasterResponse registerResponse =
RegisterApplicationMasterRequest registerReq = interceptor.registerApplicationMaster(registerReq);
Records.newRecord(RegisterApplicationMasterRequest.class); Assert.assertNotNull(registerResponse);
registerReq.setHost(Integer.toString(testAppId)); lastResponseId = 0;
registerReq.setRpcPort(0);
registerReq.setTrackingUrl("");
RegisterApplicationMasterResponse registerResponse = Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse);
Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); // Allocate the first batch of containers, with sc1 and sc2 active
registerSubCluster(SubClusterId.newInstance("SC-1"));
registerSubCluster(SubClusterId.newInstance("SC-2"));
// Allocate the first batch of containers, with sc1 and sc2 active int numberOfContainers = 3;
registerSubCluster(SubClusterId.newInstance("SC-1")); List<Container> containers =
registerSubCluster(SubClusterId.newInstance("SC-2")); getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize());
int numberOfContainers = 3; // Allocate the second batch of containers, with sc1 and sc3 active
List<Container> containers = deRegisterSubCluster(SubClusterId.newInstance("SC-2"));
getContainersAndAssert(numberOfContainers, numberOfContainers * 2); registerSubCluster(SubClusterId.newInstance("SC-3"));
Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize());
// Allocate the second batch of containers, with sc1 and sc3 active numberOfContainers = 1;
deRegisterSubCluster(SubClusterId.newInstance("SC-2")); containers.addAll(
registerSubCluster(SubClusterId.newInstance("SC-3")); getContainersAndAssert(numberOfContainers, numberOfContainers * 2));
Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize());
numberOfContainers = 1; // Allocate the third batch of containers with only in home sub-cluster
containers.addAll( // active
getContainersAndAssert(numberOfContainers, numberOfContainers * 2)); deRegisterSubCluster(SubClusterId.newInstance("SC-1"));
Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize()); deRegisterSubCluster(SubClusterId.newInstance("SC-3"));
registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
// Allocate the third batch of containers with only in home sub-cluster numberOfContainers = 2;
// active containers.addAll(
deRegisterSubCluster(SubClusterId.newInstance("SC-1")); getContainersAndAssert(numberOfContainers, numberOfContainers * 1));
deRegisterSubCluster(SubClusterId.newInstance("SC-3")); Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize());
registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
numberOfContainers = 2; // Release all containers
containers.addAll( releaseContainersAndAssert(containers);
getContainersAndAssert(numberOfContainers, numberOfContainers * 1));
Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize());
// Release all containers // Finish the application
releaseContainersAndAssert(containers); FinishApplicationMasterRequest finishReq =
Records.newRecord(FinishApplicationMasterRequest.class);
finishReq.setDiagnostics("");
finishReq.setTrackingUrl("");
finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
// Finish the application FinishApplicationMasterResponse finshResponse =
FinishApplicationMasterRequest finishReq = interceptor.finishApplicationMaster(finishReq);
Records.newRecord(FinishApplicationMasterRequest.class); Assert.assertNotNull(finshResponse);
finishReq.setDiagnostics(""); Assert.assertEquals(true, finshResponse.getIsUnregistered());
finishReq.setTrackingUrl("");
finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
FinishApplicationMasterResponse finshResponse = return null;
interceptor.finishApplicationMaster(finishReq); }
Assert.assertNotNull(finshResponse); });
Assert.assertEquals(true, finshResponse.getIsUnregistered());
} }
/* /*
@ -333,49 +370,58 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
*/ */
@Test @Test
public void testReregister() throws Exception { public void testReregister() throws Exception {
UserGroupInformation ugi =
interceptor.getUGIWithToken(interceptor.getAttemptId());
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
// Register the application // Register the application
RegisterApplicationMasterRequest registerReq = RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class); Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId)); registerReq.setHost(Integer.toString(testAppId));
registerReq.setRpcPort(0); registerReq.setRpcPort(0);
registerReq.setTrackingUrl(""); registerReq.setTrackingUrl("");
RegisterApplicationMasterResponse registerResponse = RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq); interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse); Assert.assertNotNull(registerResponse);
lastResponseId = 0;
Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
// Allocate the first batch of containers // Allocate the first batch of containers
registerSubCluster(SubClusterId.newInstance("SC-1")); registerSubCluster(SubClusterId.newInstance("SC-1"));
registerSubCluster(SubClusterId.newInstance(HOME_SC_ID)); registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
interceptor.setShouldReRegisterNext(); interceptor.setShouldReRegisterNext();
int numberOfContainers = 3; int numberOfContainers = 3;
List<Container> containers = List<Container> containers =
getContainersAndAssert(numberOfContainers, numberOfContainers * 2); getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
interceptor.setShouldReRegisterNext(); interceptor.setShouldReRegisterNext();
// Release all containers // Release all containers
releaseContainersAndAssert(containers); releaseContainersAndAssert(containers);
interceptor.setShouldReRegisterNext(); interceptor.setShouldReRegisterNext();
// Finish the application // Finish the application
FinishApplicationMasterRequest finishReq = FinishApplicationMasterRequest finishReq =
Records.newRecord(FinishApplicationMasterRequest.class); Records.newRecord(FinishApplicationMasterRequest.class);
finishReq.setDiagnostics(""); finishReq.setDiagnostics("");
finishReq.setTrackingUrl(""); finishReq.setTrackingUrl("");
finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
FinishApplicationMasterResponse finshResponse = FinishApplicationMasterResponse finshResponse =
interceptor.finishApplicationMaster(finishReq); interceptor.finishApplicationMaster(finishReq);
Assert.assertNotNull(finshResponse); Assert.assertNotNull(finshResponse);
Assert.assertEquals(true, finshResponse.getIsUnregistered()); Assert.assertEquals(true, finshResponse.getIsUnregistered());
return null;
}
});
} }
/* /*
@ -442,6 +488,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
// Use port number 1001 to let mock RM block in the register call // Use port number 1001 to let mock RM block in the register call
response = interceptor.registerApplicationMaster( response = interceptor.registerApplicationMaster(
RegisterApplicationMasterRequest.newInstance(null, 1001, null)); RegisterApplicationMasterRequest.newInstance(null, 1001, null));
lastResponseId = 0;
} catch (Exception e) { } catch (Exception e) {
LOG.info("Register thread exception", e); LOG.info("Register thread exception", e);
response = null; response = null;
@ -460,9 +507,11 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
testRecover(null); testRecover(null);
} }
public void testRecover(RegistryOperations registryObj) throws Exception { protected void testRecover(final RegistryOperations registryObj)
ApplicationUserInfo userInfo = getApplicationUserInfo(testAppId); throws Exception {
userInfo.getUser().doAs(new PrivilegedExceptionAction<Object>() { UserGroupInformation ugi =
interceptor.getUGIWithToken(interceptor.getAttemptId());
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override @Override
public Object run() throws Exception { public Object run() throws Exception {
interceptor = new TestableFederationInterceptor(); interceptor = new TestableFederationInterceptor();
@ -480,6 +529,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
RegisterApplicationMasterResponse registerResponse = RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq); interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse); Assert.assertNotNull(registerResponse);
lastResponseId = 0;
Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
@ -492,6 +542,9 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
getContainersAndAssert(numberOfContainers, numberOfContainers * 2); getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
// Make sure all async hb threads are done
interceptor.drainAllAsyncQueue(true);
// Prepare for Federation Interceptor restart and recover // Prepare for Federation Interceptor restart and recover
Map<String, byte[]> recoveredDataMap = Map<String, byte[]> recoveredDataMap =
recoverDataMapForAppAttempt(nmStateStore, attemptId); recoverDataMapForAppAttempt(nmStateStore, attemptId);
@ -517,22 +570,21 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
interceptor.recover(recoveredDataMap); interceptor.recover(recoveredDataMap);
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
Assert.assertEquals(Integer.MAX_VALUE,
interceptor.getLastHomeResponseId());
// The first allocate call expects a fail-over exception and re-register // The first allocate call expects a fail-over exception and re-register
int responseId = 10;
AllocateRequest allocateRequest =
Records.newRecord(AllocateRequest.class);
allocateRequest.setResponseId(responseId);
try { try {
interceptor.allocate(allocateRequest); AllocateRequest allocateRequest =
Records.newRecord(AllocateRequest.class);
allocateRequest.setResponseId(lastResponseId);
AllocateResponse allocateResponse =
interceptor.allocate(allocateRequest);
lastResponseId = allocateResponse.getResponseId();
Assert.fail("Expecting an ApplicationMasterNotRegisteredException " Assert.fail("Expecting an ApplicationMasterNotRegisteredException "
+ " after FederationInterceptor restarts and recovers"); + " after FederationInterceptor restarts and recovers");
} catch (ApplicationMasterNotRegisteredException e) { } catch (ApplicationMasterNotRegisteredException e) {
} }
interceptor.registerApplicationMaster(registerReq); interceptor.registerApplicationMaster(registerReq);
Assert.assertEquals(responseId, interceptor.getLastHomeResponseId()); lastResponseId = 0;
// Release all containers // Release all containers
releaseContainersAndAssert(containers); releaseContainersAndAssert(containers);
@ -614,6 +666,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
RegisterApplicationMasterResponse registerResponse = RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq); interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse); Assert.assertNotNull(registerResponse);
lastResponseId = 0;
} }
} }
@ -629,6 +682,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
RegisterApplicationMasterResponse registerResponse = RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq); interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse); Assert.assertNotNull(registerResponse);
lastResponseId = 0;
// Register the application second time with a different request obj // Register the application second time with a different request obj
registerReq = Records.newRecord(RegisterApplicationMasterRequest.class); registerReq = Records.newRecord(RegisterApplicationMasterRequest.class);
@ -637,6 +691,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
registerReq.setTrackingUrl("different"); registerReq.setTrackingUrl("different");
try { try {
registerResponse = interceptor.registerApplicationMaster(registerReq); registerResponse = interceptor.registerApplicationMaster(registerReq);
lastResponseId = 0;
Assert.fail("Should throw if a different request obj is used"); Assert.fail("Should throw if a different request obj is used");
} catch (YarnException e) { } catch (YarnException e) {
} }
@ -689,20 +744,22 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
@Test @Test
public void testSecondAttempt() throws Exception { public void testSecondAttempt() throws Exception {
ApplicationUserInfo userInfo = getApplicationUserInfo(testAppId); final RegisterApplicationMasterRequest registerReq =
userInfo.getUser().doAs(new PrivilegedExceptionAction<Object>() { Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
registerReq.setRpcPort(testAppId);
registerReq.setTrackingUrl("");
UserGroupInformation ugi =
interceptor.getUGIWithToken(interceptor.getAttemptId());
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override @Override
public Object run() throws Exception { public Object run() throws Exception {
// Register the application // Register the application
RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
registerReq.setRpcPort(testAppId);
registerReq.setTrackingUrl("");
RegisterApplicationMasterResponse registerResponse = RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq); interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse); Assert.assertNotNull(registerResponse);
lastResponseId = 0;
Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
@ -714,10 +771,13 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
List<Container> containers = List<Container> containers =
getContainersAndAssert(numberOfContainers, numberOfContainers * 2); getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
for (Container c : containers) { for (Container c : containers) {
System.out.println(c.getId() + " ha"); LOG.info("Allocated container " + c.getId());
} }
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
// Make sure all async hb threads are done
interceptor.drainAllAsyncQueue(true);
// Preserve the mock RM instances for secondaries // Preserve the mock RM instances for secondaries
ConcurrentHashMap<String, MockResourceManagerFacade> secondaries = ConcurrentHashMap<String, MockResourceManagerFacade> secondaries =
interceptor.getSecondaryRMs(); interceptor.getSecondaryRMs();
@ -729,8 +789,20 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
interceptor = new TestableFederationInterceptor(null, secondaries); interceptor = new TestableFederationInterceptor(null, secondaries);
interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, interceptor.init(new AMRMProxyApplicationContextImpl(nmContext,
getConf(), attemptId, "test-user", null, null, null, registry)); getConf(), attemptId, "test-user", null, null, null, registry));
registerResponse = interceptor.registerApplicationMaster(registerReq); return null;
}
});
// Update the ugi with new attemptId
ugi = interceptor.getUGIWithToken(interceptor.getAttemptId());
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq);
lastResponseId = 0;
int numberOfContainers = 3;
// Should re-attach secondaries and get the three running containers // Should re-attach secondaries and get the three running containers
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
Assert.assertEquals(numberOfContainers, Assert.assertEquals(numberOfContainers,

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
import java.io.IOException; import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -26,18 +27,26 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.AMHeartbeatRequestHandler;
import org.apache.hadoop.yarn.server.MockResourceManagerFacade; import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager; import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager;
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager; import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* Extends the FederationInterceptor and overrides methods to provide a testable * Extends the FederationInterceptor and overrides methods to provide a testable
* implementation of FederationInterceptor. * implementation of FederationInterceptor.
*/ */
public class TestableFederationInterceptor extends FederationInterceptor { public class TestableFederationInterceptor extends FederationInterceptor {
public static final Logger LOG =
LoggerFactory.getLogger(TestableFederationInterceptor.class);
private ConcurrentHashMap<String, MockResourceManagerFacade> private ConcurrentHashMap<String, MockResourceManagerFacade>
secondaryResourceManagers = new ConcurrentHashMap<>(); secondaryResourceManagers = new ConcurrentHashMap<>();
private AtomicInteger runningIndex = new AtomicInteger(0); private AtomicInteger runningIndex = new AtomicInteger(0);
@ -58,6 +67,12 @@ public class TestableFederationInterceptor extends FederationInterceptor {
return new TestableUnmanagedAMPoolManager(threadPool); return new TestableUnmanagedAMPoolManager(threadPool);
} }
@Override
protected AMHeartbeatRequestHandler createHomeHeartbeartHandler(
Configuration conf, ApplicationId appId) {
return new TestableAMRequestHandlerThread(conf, appId);
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
protected <T> T createHomeRMProxy(AMRMProxyApplicationContext appContext, protected <T> T createHomeRMProxy(AMRMProxyApplicationContext appContext,
@ -109,6 +124,71 @@ public class TestableFederationInterceptor extends FederationInterceptor {
return secondaryResourceManagers; return secondaryResourceManagers;
} }
protected MockResourceManagerFacade getSecondaryRM(String scId) {
return secondaryResourceManagers.get(scId);
}
/**
* Drain all aysnc heartbeat threads, comes in two favors:
*
* 1. waitForAsyncHBThreadFinish == false. Only wait for the async threads to
* pick up all pending heartbeat requests. Not necessarily wait for all
* threads to finish processing the last request. This is used to make sure
* all new UAM are launched by the async threads, but at the same time will
* finish draining while (slow) RM is still processing the last heartbeat
* request.
*
* 2. waitForAsyncHBThreadFinish == true. Wait for all async thread to finish
* processing all heartbeat requests.
*/
protected void drainAllAsyncQueue(boolean waitForAsyncHBThreadFinish)
throws YarnException {
LOG.info("waiting to drain home heartbeat handler");
if (waitForAsyncHBThreadFinish) {
getHomeHeartbeartHandler().drainHeartbeatThread();
} else {
while (getHomeHeartbeartHandler().getRequestQueueSize() > 0) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
}
}
LOG.info("waiting to drain UAM heartbeat handlers");
UnmanagedAMPoolManager uamPool = getUnmanagedAMPool();
if (waitForAsyncHBThreadFinish) {
getUnmanagedAMPool().drainUAMHeartbeats();
} else {
while (true) {
boolean done = true;
for (String scId : uamPool.getAllUAMIds()) {
if (uamPool.getRequestQueueSize(scId) > 0) {
done = false;
break;
}
}
if (done) {
break;
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
}
}
}
protected UserGroupInformation getUGIWithToken(
ApplicationAttemptId appAttemptId) {
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(appAttemptId.toString());
AMRMTokenIdentifier token = new AMRMTokenIdentifier(appAttemptId, 1);
ugi.addTokenIdentifier(token);
return ugi;
}
/** /**
* Extends the UnmanagedAMPoolManager and overrides methods to provide a * Extends the UnmanagedAMPoolManager and overrides methods to provide a
* testable implementation of UnmanagedAMPoolManager. * testable implementation of UnmanagedAMPoolManager.
@ -141,6 +221,7 @@ public class TestableFederationInterceptor extends FederationInterceptor {
String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) { String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) {
super(conf, appId, queueName, submitter, appNameSuffix, super(conf, appId, queueName, submitter, appNameSuffix,
keepContainersAcrossApplicationAttempts, "TEST"); keepContainersAcrossApplicationAttempts, "TEST");
setHandlerThread(new TestableAMRequestHandlerThread(conf, appId));
} }
/** /**
@ -156,4 +237,30 @@ public class TestableFederationInterceptor extends FederationInterceptor {
YarnConfiguration.getClusterId(config)); YarnConfiguration.getClusterId(config));
} }
} }
/**
* Wrap the handler thread so it calls from the same user.
*/
protected class TestableAMRequestHandlerThread
extends AMHeartbeatRequestHandler {
public TestableAMRequestHandlerThread(Configuration conf,
ApplicationId applicationId) {
super(conf, applicationId);
}
@Override
public void run() {
try {
getUGIWithToken(getAttemptId())
.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() {
TestableAMRequestHandlerThread.super.run();
return null;
}
});
} catch (Exception e) {
}
}
}
} }

View File

@ -84,7 +84,6 @@ import com.google.common.annotations.VisibleForTesting;
public class ApplicationMasterService extends AbstractService implements public class ApplicationMasterService extends AbstractService implements
ApplicationMasterProtocol { ApplicationMasterProtocol {
private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class); private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class);
private static final int PRE_REGISTER_RESPONSE_ID = -1;
private final AMLivelinessMonitor amLivelinessMonitor; private final AMLivelinessMonitor amLivelinessMonitor;
private YarnScheduler rScheduler; private YarnScheduler rScheduler;
@ -377,11 +376,6 @@ public class ApplicationMasterService extends AbstractService implements
protected static final Allocation EMPTY_ALLOCATION = new Allocation( protected static final Allocation EMPTY_ALLOCATION = new Allocation(
EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null); EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null);
private int getNextResponseId(int responseId) {
// Loop between 0 to Integer.MAX_VALUE
return (responseId + 1) & Integer.MAX_VALUE;
}
@Override @Override
public AllocateResponse allocate(AllocateRequest request) public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException { throws YarnException, IOException {
@ -415,8 +409,8 @@ public class ApplicationMasterService extends AbstractService implements
} }
// Normally request.getResponseId() == lastResponse.getResponseId() // Normally request.getResponseId() == lastResponse.getResponseId()
if (getNextResponseId(request.getResponseId()) == lastResponse if (AMRMClientUtils.getNextResponseId(
.getResponseId()) { request.getResponseId()) == lastResponse.getResponseId()) {
// heartbeat one step old, simply return lastReponse // heartbeat one step old, simply return lastReponse
return lastResponse; return lastResponse;
} else if (request.getResponseId() != lastResponse.getResponseId()) { } else if (request.getResponseId() != lastResponse.getResponseId()) {
@ -461,7 +455,8 @@ public class ApplicationMasterService extends AbstractService implements
* need to worry about unregister call occurring in between (which * need to worry about unregister call occurring in between (which
* removes the lock object). * removes the lock object).
*/ */
response.setResponseId(getNextResponseId(lastResponse.getResponseId())); response.setResponseId(
AMRMClientUtils.getNextResponseId(lastResponse.getResponseId()));
lock.setAllocateResponse(response); lock.setAllocateResponse(response);
return response; return response;
} }
@ -472,7 +467,7 @@ public class ApplicationMasterService extends AbstractService implements
recordFactory.newRecordInstance(AllocateResponse.class); recordFactory.newRecordInstance(AllocateResponse.class);
// set response id to -1 before application master for the following // set response id to -1 before application master for the following
// attemptID get registered // attemptID get registered
response.setResponseId(PRE_REGISTER_RESPONSE_ID); response.setResponseId(AMRMClientUtils.PRE_REGISTER_RESPONSE_ID);
LOG.info("Registering app attempt : " + attemptId); LOG.info("Registering app attempt : " + attemptId);
responseMap.put(attemptId, new AllocateResponseLock(response)); responseMap.put(attemptId, new AllocateResponseLock(response));
rmContext.getNMTokenSecretManager().registerApplicationAttempt(attemptId); rmContext.getNMTokenSecretManager().registerApplicationAttempt(attemptId);