YARN-7652. Handle AM register requests asynchronously in FederationInterceptor. Contributed by Botong Huang.

(cherry picked from commit c3d22d3b45)
This commit is contained in:
Inigo Goiri 2018-10-09 10:29:40 -07:00
parent fa4a111037
commit b0900ad310
8 changed files with 215 additions and 230 deletions

View File

@ -59,7 +59,7 @@ public class AMHeartbeatRequestHandler extends Thread {
private int lastResponseId;
public AMHeartbeatRequestHandler(Configuration conf,
ApplicationId applicationId) {
ApplicationId applicationId, AMRMClientRelayer rmProxyRelayer) {
super("AMHeartbeatRequestHandler Heartbeat Handler Thread");
this.setUncaughtExceptionHandler(
new HeartBeatThreadUncaughtExceptionHandler());
@ -69,6 +69,7 @@ public class AMHeartbeatRequestHandler extends Thread {
this.conf = conf;
this.applicationId = applicationId;
this.requestQueue = new LinkedBlockingQueue<>();
this.rmProxyRelayer = rmProxyRelayer;
resetLastResponseId();
}
@ -156,13 +157,6 @@ public class AMHeartbeatRequestHandler extends Thread {
this.lastResponseId = 0;
}
/**
* Set the AMRMClientRelayer for RM connection.
*/
public void setAMRMClientRelayer(AMRMClientRelayer relayer) {
this.rmProxyRelayer = relayer;
}
/**
* Set the UGI for RM connection.
*/

View File

@ -181,7 +181,11 @@ public class AMRMClientRelayer extends AbstractService
this.amRegistrationRequest = registerRequest;
}
public void setRMClient(ApplicationMasterProtocol client){
public String getRMIdentifier() {
return this.rmId;
}
public void setRMClient(ApplicationMasterProtocol client) {
this.rmClient = client;
}

View File

@ -134,10 +134,12 @@ public class UnmanagedApplicationManager {
this.submitter = submitter;
this.appNameSuffix = appNameSuffix;
this.userUgi = null;
this.heartbeatHandler =
new AMHeartbeatRequestHandler(this.conf, this.applicationId);
// Relayer's rmClient will be set after the RM connection is created
this.rmProxyRelayer =
new AMRMClientRelayer(null, this.applicationId, rmName);
this.heartbeatHandler = createAMHeartbeatRequestHandler(this.conf,
this.applicationId, this.rmProxyRelayer);
this.connectionInitiated = false;
this.registerRequest = null;
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
@ -150,6 +152,13 @@ public class UnmanagedApplicationManager {
keepContainersAcrossApplicationAttempts;
}
@VisibleForTesting
protected AMHeartbeatRequestHandler createAMHeartbeatRequestHandler(
Configuration config, ApplicationId appId,
AMRMClientRelayer relayer) {
return new AMHeartbeatRequestHandler(config, appId, relayer);
}
/**
* Launch a new UAM in the resource manager.
*
@ -191,8 +200,6 @@ public class UnmanagedApplicationManager {
this.applicationId.toString(), UserGroupInformation.getCurrentUser());
this.rmProxyRelayer.setRMClient(createRMProxy(
ApplicationMasterProtocol.class, this.conf, this.userUgi, amrmToken));
this.heartbeatHandler.setAMRMClientRelayer(this.rmProxyRelayer);
this.heartbeatHandler.setUGI(this.userUgi);
}
@ -521,11 +528,6 @@ public class UnmanagedApplicationManager {
return this.heartbeatHandler.getRequestQueueSize();
}
@VisibleForTesting
protected void setHandlerThread(AMHeartbeatRequestHandler thread) {
this.heartbeatHandler = thread;
}
@VisibleForTesting
protected void drainHeartbeatThread() {
this.heartbeatHandler.drainHeartbeatThread();

View File

@ -192,10 +192,15 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
private boolean shouldWaitForSyncNextAllocate = false;
// For unit test synchronization
private static Object syncObj = new Object();
private static Object registerSyncObj = new Object();
private static Object allocateSyncObj = new Object();
public static Object getSyncObj() {
return syncObj;
public static Object getRegisterSyncObj() {
return registerSyncObj;
}
public static Object getAllocateSyncObj() {
return allocateSyncObj;
}
public MockResourceManagerFacade(Configuration conf,
@ -275,14 +280,14 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
shouldReRegisterNext = false;
// Make sure we wait for certain test cases last in the method
synchronized (syncObj) {
syncObj.notifyAll();
synchronized (registerSyncObj) {
registerSyncObj.notifyAll();
// We reuse the port number to indicate whether the unit test want us to
// wait here
if (request.getRpcPort() > 1000) {
LOG.info("Register call in RM start waiting");
try {
syncObj.wait();
registerSyncObj.wait();
LOG.info("Register call in RM wait finished");
} catch (InterruptedException e) {
LOG.info("Register call in RM wait interrupted", e);
@ -349,13 +354,13 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
}
// Wait for signal for certain test cases
synchronized (syncObj) {
synchronized (allocateSyncObj) {
if (shouldWaitForSyncNextAllocate) {
shouldWaitForSyncNextAllocate = false;
LOG.info("Allocate call in RM start waiting");
try {
syncObj.wait();
allocateSyncObj.wait();
LOG.info("Allocate call in RM wait finished");
} catch (InterruptedException e) {
LOG.info("Allocate call in RM wait interrupted", e);
@ -416,8 +421,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
}
Assert.assertTrue("ContainerId " + id
+ " being released is not valid for application: "
+ conf.get("AMRMTOKEN"), found);
+ " being released is not valid for application: " + attemptId,
found);
ids.remove(id);
completedList.add(
@ -427,7 +432,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
}
LOG.info("Allocating containers: " + containerList.size()
+ " for application attempt: " + conf.get("AMRMTOKEN"));
+ " for application attempt: " + attemptId);
// Always issue a new AMRMToken as if RM rolled master key
Token newAMRMToken = Token.newInstance(new byte[0],
@ -440,7 +445,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
}
public void setWaitForSyncNextAllocate(boolean wait) {
synchronized (syncObj) {
synchronized (allocateSyncObj) {
shouldWaitForSyncNextAllocate = wait;
}
}

View File

@ -35,6 +35,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.AMHeartbeatRequestHandler;
import org.apache.hadoop.yarn.server.AMRMClientRelayer;
import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
import org.apache.hadoop.yarn.util.AsyncCallback;
import org.junit.Assert;
@ -65,7 +67,8 @@ public class TestUnmanagedApplicationManager {
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1);
uam = new TestableUnmanagedApplicationManager(conf,
attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true);
attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true,
"rm");
}
protected void waitForCallBackCountAndCheckZeroPending(
@ -121,7 +124,8 @@ public class TestUnmanagedApplicationManager {
MockResourceManagerFacade rmProxy = uam.getRMProxy();
uam = new TestableUnmanagedApplicationManager(conf,
attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true);
attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true,
"rm");
uam.setRMProxy(rmProxy);
reAttachUAM(null, attemptId);
@ -186,7 +190,7 @@ public class TestUnmanagedApplicationManager {
});
// Sync obj from mock RM
Object syncObj = MockResourceManagerFacade.getSyncObj();
Object syncObj = MockResourceManagerFacade.getRegisterSyncObj();
// Wait for register call in the thread get into RM and then wake us
synchronized (syncObj) {
@ -365,16 +369,24 @@ public class TestUnmanagedApplicationManager {
/**
* Testable UnmanagedApplicationManager that talks to a mock RM.
*/
public static class TestableUnmanagedApplicationManager
public class TestableUnmanagedApplicationManager
extends UnmanagedApplicationManager {
private MockResourceManagerFacade rmProxy;
public TestableUnmanagedApplicationManager(Configuration conf,
ApplicationId appId, String queueName, String submitter,
String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) {
String appNameSuffix, boolean keepContainersAcrossApplicationAttempts,
String rmName) {
super(conf, appId, queueName, submitter, appNameSuffix,
keepContainersAcrossApplicationAttempts, "TEST");
keepContainersAcrossApplicationAttempts, rmName);
}
@Override
protected AMHeartbeatRequestHandler createAMHeartbeatRequestHandler(
Configuration config, ApplicationId appId,
AMRMClientRelayer rmProxyRelayer) {
return new TestableAMRequestHandlerThread(config, appId, rmProxyRelayer);
}
@SuppressWarnings("unchecked")
@ -402,4 +414,31 @@ public class TestUnmanagedApplicationManager {
}
}
/**
* Wrap the handler thread so it calls from the same user.
*/
public class TestableAMRequestHandlerThread
extends AMHeartbeatRequestHandler {
public TestableAMRequestHandlerThread(Configuration conf,
ApplicationId applicationId, AMRMClientRelayer rmProxyRelayer) {
super(conf, applicationId, rmProxyRelayer);
}
@Override
public void run() {
try {
getUGIWithToken(attemptId)
.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() {
TestableAMRequestHandlerThread.super.run();
return null;
}
});
} catch (Exception e) {
LOG.error("Exception running TestableAMRequestHandlerThread", e);
}
}
}
}

View File

@ -167,6 +167,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
*/
private Map<SubClusterId, List<AllocateResponse>> asyncResponseSink;
private Map<SubClusterId, RegisterApplicationMasterResponse> uamRegistrations;
// For unit test synchronization
private Map<SubClusterId, Future<?>> uamRegisterFutures;
/** Thread pool used for asynchronous operations. */
private ExecutorService threadpool;
@ -227,6 +232,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
public FederationInterceptor() {
this.containerIdToSubClusterIdMap = new ConcurrentHashMap<>();
this.asyncResponseSink = new ConcurrentHashMap<>();
this.uamRegistrations = new ConcurrentHashMap<>();
this.uamRegisterFutures = new ConcurrentHashMap<>();
this.threadpool = Executors.newCachedThreadPool();
this.uamPool = createUnmanagedAMPoolManager(this.threadpool);
this.secondaryRelayers = new ConcurrentHashMap<>();
@ -279,8 +286,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
ApplicationMasterProtocol.class, appOwner), appId,
this.homeSubClusterId.toString());
this.homeHeartbeartHandler = createHomeHeartbeartHandler(conf, appId);
this.homeHeartbeartHandler.setAMRMClientRelayer(this.homeRMRelayer);
this.homeHeartbeartHandler =
createHomeHeartbeartHandler(conf, appId, this.homeRMRelayer);
this.homeHeartbeartHandler.setUGI(appOwner);
this.homeHeartbeartHandler.setDaemon(true);
this.homeHeartbeartHandler.start();
@ -615,10 +622,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
/**
* Send the requests to the all sub-cluster resource managers. All
* requests are synchronously triggered but sent asynchronously. Later the
* responses will be collected and merged. In addition, it also returns
* the newly registered UAMs.
* responses will be collected and merged.
*/
Registrations newRegistrations = sendRequestsToResourceManagers(requests);
sendRequestsToResourceManagers(requests);
// Wait for the first async response to arrive
long startTime = this.clock.getTime();
@ -646,9 +652,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
// Merge the containers and NMTokens from the new registrations into
// the response
if (!isNullOrEmpty(newRegistrations.getSuccessfulRegistrations())) {
mergeRegistrationResponses(response,
newRegistrations.getSuccessfulRegistrations());
if (!isNullOrEmpty(this.uamRegistrations)) {
Map<SubClusterId, RegisterApplicationMasterResponse> newRegistrations;
synchronized (this.uamRegistrations) {
newRegistrations = new HashMap<>(this.uamRegistrations);
this.uamRegistrations.clear();
}
mergeRegistrationResponses(response, newRegistrations);
}
// update the responseId and return the final response to AM
@ -850,8 +861,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
@VisibleForTesting
protected AMHeartbeatRequestHandler createHomeHeartbeartHandler(
Configuration conf, ApplicationId appId) {
return new AMHeartbeatRequestHandler(conf, appId);
Configuration conf, ApplicationId appId,
AMRMClientRelayer rmProxyRelayer) {
return new AMHeartbeatRequestHandler(conf, appId, rmProxyRelayer);
}
/**
@ -1107,18 +1119,16 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
*
* @param requests contains the heart beat requests to send to the resource
* manager keyed by the sub-cluster id
* @return the registration responses from the newly added sub-cluster
* resource managers
* @throws YarnException
* @throws IOException
*/
private Registrations sendRequestsToResourceManagers(
private void sendRequestsToResourceManagers(
Map<SubClusterId, AllocateRequest> requests)
throws YarnException, IOException {
// Create new UAM instances for the sub-cluster that we have not seen
// before
Registrations registrations = registerWithNewSubClusters(requests.keySet());
// Create new UAM instances for the sub-cluster that we haven't seen before
List<SubClusterId> newSubClusters =
registerAndAllocateWithNewSubClusters(requests);
// Now that all the registrations are done, send the allocation request
// to the sub-cluster RMs asynchronously and don't wait for the response.
@ -1126,6 +1136,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
// response sink, then merged and sent to the application master.
for (Entry<SubClusterId, AllocateRequest> entry : requests.entrySet()) {
SubClusterId subClusterId = entry.getKey();
if (newSubClusters.contains(subClusterId)) {
// For new sub-clusters, we have already sent the request right after
// register in the async thread
continue;
}
if (subClusterId.equals(this.homeSubClusterId)) {
// Request for the home sub-cluster resource manager
@ -1133,131 +1148,100 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
new HeartbeatCallBack(this.homeSubClusterId, false));
} else {
if (!this.uamPool.hasUAMId(subClusterId.getId())) {
// TODO: This means that the registration for this sub-cluster RM
// failed. For now, we ignore the resource requests and continue
// but we need to fix this and handle this situation. One way would
// be to send the request to another RM by consulting the policy.
LOG.warn("Unmanaged AM registration not found for sub-cluster {}",
subClusterId);
continue;
throw new YarnException("UAM not found for " + this.attemptId
+ " in sub-cluster " + subClusterId);
}
this.uamPool.allocateAsync(subClusterId.getId(), entry.getValue(),
new HeartbeatCallBack(subClusterId, true));
}
}
return registrations;
}
/**
* This method ensures that Unmanaged AMs are created for each of the
* specified sub-cluster specified in the input and registers with the
* corresponding resource managers.
* This method ensures that Unmanaged AMs are created for newly specified
* sub-clusters, registers with the corresponding resource managers and send
* the first allocate request async.
*/
private Registrations registerWithNewSubClusters(
Set<SubClusterId> subClusterSet) throws IOException {
List<SubClusterId> failedRegistrations = new ArrayList<>();
Map<SubClusterId, RegisterApplicationMasterResponse>
successfulRegistrations = new HashMap<>();
private List<SubClusterId> registerAndAllocateWithNewSubClusters(
final Map<SubClusterId, AllocateRequest> requests) throws IOException {
// Check to see if there are any new sub-clusters in this request
// list and create and register Unmanaged AM instance for the new ones
List<String> newSubClusters = new ArrayList<>();
for (SubClusterId subClusterId : subClusterSet) {
List<SubClusterId> newSubClusters = new ArrayList<>();
for (SubClusterId subClusterId : requests.keySet()) {
if (!subClusterId.equals(this.homeSubClusterId)
&& !this.uamPool.hasUAMId(subClusterId.getId())) {
newSubClusters.add(subClusterId.getId());
newSubClusters.add(subClusterId);
}
}
if (newSubClusters.size() > 0) {
final RegisterApplicationMasterRequest registerRequest =
this.amRegistrationRequest;
final AMRMProxyApplicationContext appContext = getApplicationContext();
ExecutorCompletionService<RegisterApplicationMasterResponseInfo>
completionService = new ExecutorCompletionService<>(this.threadpool);
this.uamRegisterFutures.clear();
for (final SubClusterId scId : newSubClusters) {
Future<?> future = this.threadpool.submit(new Runnable() {
@Override
public void run() {
String subClusterId = scId.getId();
for (final String subClusterId : newSubClusters) {
completionService
.submit(new Callable<RegisterApplicationMasterResponseInfo>() {
@Override
public RegisterApplicationMasterResponseInfo call()
throws Exception {
// Create a config loaded with federation on and subclusterId
// for each UAM
YarnConfiguration config = new YarnConfiguration(getConf());
FederationProxyProviderUtil.updateConfForFederation(config,
subClusterId);
// Create a config loaded with federation on and subclusterId
// for each UAM
YarnConfiguration config = new YarnConfiguration(getConf());
FederationProxyProviderUtil.updateConfForFederation(config,
subClusterId);
RegisterApplicationMasterResponse uamResponse = null;
Token<AMRMTokenIdentifier> token = null;
try {
// For appNameSuffix, use subClusterId of the home sub-cluster
token = uamPool.launchUAM(subClusterId, config,
attemptId.getApplicationId(), amRegistrationResponse.getQueue(),
getApplicationContext().getUser(), homeSubClusterId.toString(),
true, subClusterId);
RegisterApplicationMasterResponse uamResponse = null;
Token<AMRMTokenIdentifier> token = null;
try {
// For appNameSuffix, use subClusterId of the home sub-cluster
token = uamPool.launchUAM(subClusterId, config,
attemptId.getApplicationId(),
amRegistrationResponse.getQueue(), appContext.getUser(),
homeSubClusterId.toString(), true, subClusterId);
secondaryRelayers.put(subClusterId,
uamPool.getAMRMClientRelayer(subClusterId));
secondaryRelayers.put(subClusterId,
uamPool.getAMRMClientRelayer(subClusterId));
uamResponse = uamPool.registerApplicationMaster(subClusterId,
amRegistrationRequest);
} catch (Throwable e) {
LOG.error("Failed to register application master: " + subClusterId
+ " Application: " + attemptId, e);
// TODO: UAM registration for this sub-cluster RM
// failed. For now, we ignore the resource requests and continue
// but we need to fix this and handle this situation. One way would
// be to send the request to another RM by consulting the policy.
return;
}
uamRegistrations.put(scId, uamResponse);
LOG.info("Successfully registered unmanaged application master: "
+ subClusterId + " ApplicationId: " + attemptId);
uamResponse = uamPool.registerApplicationMaster(subClusterId,
registerRequest);
} catch (Throwable e) {
LOG.error("Failed to register application master: "
+ subClusterId + " Application: " + attemptId, e);
}
return new RegisterApplicationMasterResponseInfo(uamResponse,
SubClusterId.newInstance(subClusterId), token);
}
});
}
// Wait for other sub-cluster resource managers to return the
// response and add it to the Map for returning to the caller
for (int i = 0; i < newSubClusters.size(); ++i) {
try {
Future<RegisterApplicationMasterResponseInfo> future =
completionService.take();
RegisterApplicationMasterResponseInfo uamResponse = future.get();
if (LOG.isDebugEnabled()) {
LOG.debug("Received register application response from RM: "
+ uamResponse.getSubClusterId());
try {
uamPool.allocateAsync(subClusterId, requests.get(scId),
new HeartbeatCallBack(scId, true));
} catch (Throwable e) {
LOG.error("Failed to allocate async to " + subClusterId
+ " Application: " + attemptId, e);
}
if (uamResponse.getResponse() == null) {
failedRegistrations.add(uamResponse.getSubClusterId());
} else {
LOG.info("Successfully registered unmanaged application master: "
+ uamResponse.getSubClusterId() + " ApplicationId: "
+ this.attemptId);
successfulRegistrations.put(uamResponse.getSubClusterId(),
uamResponse.getResponse());
// Save the UAM token in registry or NMSS
// Save the UAM token in registry or NMSS
try {
if (registryClient != null) {
registryClient.writeAMRMTokenForUAM(
this.attemptId.getApplicationId(),
uamResponse.getSubClusterId().getId(),
uamResponse.getUamToken());
registryClient.writeAMRMTokenForUAM(attemptId.getApplicationId(),
subClusterId, token);
} else if (getNMStateStore() != null) {
getNMStateStore().storeAMRMProxyAppContextEntry(this.attemptId,
NMSS_SECONDARY_SC_PREFIX
+ uamResponse.getSubClusterId().getId(),
uamResponse.getUamToken().encodeToUrlString()
.getBytes(STRING_TO_BYTE_FORMAT));
getNMStateStore().storeAMRMProxyAppContextEntry(attemptId,
NMSS_SECONDARY_SC_PREFIX + subClusterId,
token.encodeToUrlString().getBytes(STRING_TO_BYTE_FORMAT));
}
} catch (Throwable e) {
LOG.error("Failed to persist UAM token from " + subClusterId
+ " Application: " + attemptId, e);
}
} catch (Exception e) {
LOG.warn("Failed to register unmanaged application master: "
+ " ApplicationId: " + this.attemptId, e);
}
}
});
this.uamRegisterFutures.put(scId, future);
}
return new Registrations(successfulRegistrations, failedRegistrations);
return newSubClusters;
}
/**
@ -1573,7 +1557,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
@VisibleForTesting
public int getUnmanagedAMPoolSize() {
protected int getUnmanagedAMPoolSize() {
return this.uamPool.getAllUAMIds().size();
}
@ -1582,6 +1566,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
return this.uamPool;
}
@VisibleForTesting
protected Map<SubClusterId, Future<?>> getUamRegisterFutures() {
return this.uamRegisterFutures;
}
@VisibleForTesting
public Map<SubClusterId, List<AllocateResponse>> getAsyncResponseSink() {
return this.asyncResponseSink;
@ -1614,7 +1603,16 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
asyncResponseSink.notifyAll();
}
// Notify policy of allocate response
try {
policyInterpreter.notifyOfResponse(subClusterId, response);
} catch (YarnException e) {
LOG.warn("notifyOfResponse for policy failed for sub-cluster "
+ subClusterId, e);
}
// Save the new AMRMToken for the UAM if present
// Do this last because it can be slow...
if (this.isUAM && response.getAMRMToken() != null) {
Token<AMRMTokenIdentifier> newToken = ConverterUtils
.convertFromYarn(response.getAMRMToken(), (Text) null);
@ -1648,44 +1646,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
}
}
// Notify policy of allocate response
try {
policyInterpreter.notifyOfResponse(subClusterId, response);
} catch (YarnException e) {
LOG.warn("notifyOfResponse for policy failed for sub-cluster "
+ subClusterId, e);
}
}
}
/**
* Private structure for encapsulating SubClusterId and
* RegisterApplicationMasterResponse instances.
*/
private static class RegisterApplicationMasterResponseInfo {
private RegisterApplicationMasterResponse response;
private SubClusterId subClusterId;
private Token<AMRMTokenIdentifier> uamToken;
RegisterApplicationMasterResponseInfo(
RegisterApplicationMasterResponse response, SubClusterId subClusterId,
Token<AMRMTokenIdentifier> uamToken) {
this.response = response;
this.subClusterId = subClusterId;
this.uamToken = uamToken;
}
public RegisterApplicationMasterResponse getResponse() {
return response;
}
public SubClusterId getSubClusterId() {
return subClusterId;
}
public Token<AMRMTokenIdentifier> getUamToken() {
return uamToken;
}
}
@ -1712,33 +1672,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
}
/**
* Private structure for encapsulating successful and failed application
* master registration responses.
*/
private static class Registrations {
private Map<SubClusterId, RegisterApplicationMasterResponse>
successfulRegistrations;
private List<SubClusterId> failedRegistrations;
Registrations(
Map<SubClusterId, RegisterApplicationMasterResponse>
successfulRegistrations,
List<SubClusterId> failedRegistrations) {
this.successfulRegistrations = successfulRegistrations;
this.failedRegistrations = failedRegistrations;
}
public Map<SubClusterId, RegisterApplicationMasterResponse>
getSuccessfulRegistrations() {
return this.successfulRegistrations;
}
public List<SubClusterId> getFailedRegistrations() {
return this.failedRegistrations;
}
}
/**
* Utility method to check if the specified Collection is null or empty.
*

View File

@ -201,9 +201,6 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
LOG.info("Number of allocated containers in the original request: "
+ Integer.toString(allocateResponse.getAllocatedContainers().size()));
// Make sure this request is picked up by all async heartbeat handlers
interceptor.drainAllAsyncQueue(false);
// Send max 10 heart beats to receive all the containers. If not, we will
// fail the test
int numHeartbeat = 0;
@ -217,8 +214,10 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
checkAMRMToken(allocateResponse.getAMRMToken());
lastResponseId = allocateResponse.getResponseId();
containers.addAll(allocateResponse.getAllocatedContainers());
// Make sure this request is picked up by all async heartbeat handlers
interceptor.drainAllAsyncQueue(false);
containers.addAll(allocateResponse.getAllocatedContainers());
LOG.info("Number of allocated containers in this request: "
+ Integer.toString(allocateResponse.getAllocatedContainers().size()));
LOG.info("Total number of allocated containers: "
@ -258,9 +257,6 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
LOG.info("Number of containers received in the original request: "
+ Integer.toString(newlyFinished.size()));
// Make sure this request is picked up by all async heartbeat handlers
interceptor.drainAllAsyncQueue(false);
// Send max 10 heart beats to receive all the containers. If not, we will
// fail the test
int numHeartbeat = 0;
@ -273,10 +269,12 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
checkAMRMToken(allocateResponse.getAMRMToken());
lastResponseId = allocateResponse.getResponseId();
// Make sure this request is picked up by all async heartbeat handlers
interceptor.drainAllAsyncQueue(false);
newlyFinished = getCompletedContainerIds(
allocateResponse.getCompletedContainersStatuses());
containersForReleasedContainerIds.addAll(newlyFinished);
LOG.info("Number of containers received in this request: "
+ Integer.toString(newlyFinished.size()));
LOG.info("Total number of containers received: "
@ -438,7 +436,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
ExecutorCompletionService<RegisterApplicationMasterResponse> compSvc =
new ExecutorCompletionService<>(threadpool);
Object syncObj = MockResourceManagerFacade.getSyncObj();
Object syncObj = MockResourceManagerFacade.getRegisterSyncObj();
// Two register threads
synchronized (syncObj) {

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.AMHeartbeatRequestHandler;
import org.apache.hadoop.yarn.server.AMRMClientRelayer;
import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager;
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
@ -69,8 +70,9 @@ public class TestableFederationInterceptor extends FederationInterceptor {
@Override
protected AMHeartbeatRequestHandler createHomeHeartbeartHandler(
Configuration conf, ApplicationId appId) {
return new TestableAMRequestHandlerThread(conf, appId);
Configuration conf, ApplicationId appId,
AMRMClientRelayer rmProxyRelayer) {
return new TestableAMRequestHandlerThread(conf, appId, rmProxyRelayer);
}
@SuppressWarnings("unchecked")
@ -205,7 +207,8 @@ public class TestableFederationInterceptor extends FederationInterceptor {
String appNameSuffix, boolean keepContainersAcrossApplicationAttempts,
String rmId) {
return new TestableUnmanagedApplicationManager(conf, appId, queueName,
submitter, appNameSuffix, keepContainersAcrossApplicationAttempts);
submitter, appNameSuffix, keepContainersAcrossApplicationAttempts,
rmId);
}
}
@ -218,10 +221,17 @@ public class TestableFederationInterceptor extends FederationInterceptor {
public TestableUnmanagedApplicationManager(Configuration conf,
ApplicationId appId, String queueName, String submitter,
String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) {
String appNameSuffix, boolean keepContainersAcrossApplicationAttempts,
String rmName) {
super(conf, appId, queueName, submitter, appNameSuffix,
keepContainersAcrossApplicationAttempts, "TEST");
setHandlerThread(new TestableAMRequestHandlerThread(conf, appId));
keepContainersAcrossApplicationAttempts, rmName);
}
@Override
protected AMHeartbeatRequestHandler createAMHeartbeatRequestHandler(
Configuration conf, ApplicationId appId,
AMRMClientRelayer rmProxyRelayer) {
return new TestableAMRequestHandlerThread(conf, appId, rmProxyRelayer);
}
/**
@ -244,8 +254,8 @@ public class TestableFederationInterceptor extends FederationInterceptor {
protected class TestableAMRequestHandlerThread
extends AMHeartbeatRequestHandler {
public TestableAMRequestHandlerThread(Configuration conf,
ApplicationId applicationId) {
super(conf, applicationId);
ApplicationId applicationId, AMRMClientRelayer rmProxyRelayer) {
super(conf, applicationId, rmProxyRelayer);
}
@Override