YARN-6511. Federation: transparently spanning application across multiple sub-clusters. (Botong Huang via Subru).

(cherry picked from commit 8c988d235e)
This commit is contained in:
Subru Krishnan 2017-06-07 14:45:51 -07:00 committed by Carlo Curino
parent bed1832c93
commit 70b1a757f1
5 changed files with 1095 additions and 36 deletions

View File

@ -0,0 +1,168 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Utility class for Federation policy.
*/
@Private
public final class FederationPolicyUtils {
private static final Logger LOG =
LoggerFactory.getLogger(FederationPolicyUtils.class);
/** Disable constructor. */
private FederationPolicyUtils() {
}
/**
* A utilize method to instantiate a policy manager class given the type
* (class name) from {@link SubClusterPolicyConfiguration}.
*
* @param newType class name of the policy manager to create
* @return Policy manager
* @throws FederationPolicyInitializationException if fails
*/
public static FederationPolicyManager instantiatePolicyManager(String newType)
throws FederationPolicyInitializationException {
FederationPolicyManager federationPolicyManager = null;
try {
// create policy instance and set queue
Class<?> c = Class.forName(newType);
federationPolicyManager = (FederationPolicyManager) c.newInstance();
} catch (ClassNotFoundException e) {
throw new FederationPolicyInitializationException(e);
} catch (InstantiationException e) {
throw new FederationPolicyInitializationException(e);
} catch (IllegalAccessException e) {
throw new FederationPolicyInitializationException(e);
}
return federationPolicyManager;
}
/**
* Get Federation policy configuration from state store, using default queue
* and configuration as fallback.
*
* @param queue the queue of the application
* @param conf the Yarn configuration
* @param federationFacade state store facade
* @return SubClusterPolicyConfiguration recreated
*/
public static SubClusterPolicyConfiguration loadPolicyConfiguration(
String queue, Configuration conf,
FederationStateStoreFacade federationFacade) {
// The facade might cache this request, based on its parameterization
SubClusterPolicyConfiguration configuration = null;
if (queue != null) {
try {
configuration = federationFacade.getPolicyConfiguration(queue);
} catch (YarnException e) {
LOG.warn("Failed to get policy from FederationFacade with queue "
+ queue + ": " + e.getMessage());
}
}
// If there is no policy configured for this queue, fallback to the baseline
// policy that is configured either in the store or via XML config
if (configuration == null) {
LOG.info("No policy configured for queue {} in StateStore,"
+ " fallback to default queue", queue);
queue = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
try {
configuration = federationFacade.getPolicyConfiguration(queue);
} catch (YarnException e) {
LOG.warn("No fallback behavior defined in store, defaulting to XML "
+ "configuration fallback behavior.");
}
}
// or from XML conf otherwise.
if (configuration == null) {
LOG.info("No policy configured for default queue {} in StateStore,"
+ " fallback to local config", queue);
String defaultFederationPolicyManager =
conf.get(YarnConfiguration.FEDERATION_POLICY_MANAGER,
YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER);
String defaultPolicyParamString =
conf.get(YarnConfiguration.FEDERATION_POLICY_MANAGER_PARAMS,
YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS);
ByteBuffer defaultPolicyParam = ByteBuffer
.wrap(defaultPolicyParamString.getBytes(StandardCharsets.UTF_8));
configuration = SubClusterPolicyConfiguration.newInstance(queue,
defaultFederationPolicyManager, defaultPolicyParam);
}
return configuration;
}
/**
* Get AMRMProxy policy from state store, using default queue and
* configuration as fallback.
*
* @param queue the queue of the application
* @param oldPolicy the previous policy instance (can be null)
* @param conf the Yarn configuration
* @param federationFacade state store facade
* @param homeSubClusterId home sub-cluster id
* @return FederationAMRMProxyPolicy recreated
* @throws FederationPolicyInitializationException if fails
*/
public static FederationAMRMProxyPolicy loadAMRMPolicy(String queue,
FederationAMRMProxyPolicy oldPolicy, Configuration conf,
FederationStateStoreFacade federationFacade,
SubClusterId homeSubClusterId)
throws FederationPolicyInitializationException {
// Local policy and its configuration
SubClusterPolicyConfiguration configuration =
loadPolicyConfiguration(queue, conf, federationFacade);
// Instantiate the policyManager and get policy
FederationPolicyInitializationContext context =
new FederationPolicyInitializationContext(configuration,
federationFacade.getSubClusterResolver(), federationFacade,
homeSubClusterId);
LOG.info("Creating policy manager of type: " + configuration.getType());
FederationPolicyManager federationPolicyManager =
instantiatePolicyManager(configuration.getType());
// set queue, reinit policy if required (implementation lazily check
// content of conf), and cache it
federationPolicyManager.setQueue(configuration.getQueue());
return federationPolicyManager.getAMRMPolicy(context, oldPolicy);
}
}

View File

@ -95,7 +95,7 @@ public class RouterPolicyFacade {
new FederationPolicyInitializationContext(configuration,
subClusterResolver, federationFacade, homeSubcluster);
FederationPolicyManager fallbackPolicyManager =
instantiatePolicyManager(configuration.getType());
FederationPolicyUtils.instantiatePolicyManager(configuration.getType());
fallbackPolicyManager.setQueue(defaulKey);
// add to the cache the fallback behavior
@ -209,7 +209,7 @@ public class RouterPolicyFacade {
FederationRouterPolicy routerPolicy = policyMap.get(queue);
FederationPolicyManager federationPolicyManager =
instantiatePolicyManager(newType);
FederationPolicyUtils.instantiatePolicyManager(newType);
// set queue, reinit policy if required (implementation lazily check
// content of conf), and cache it
federationPolicyManager.setQueue(queue);
@ -224,23 +224,6 @@ public class RouterPolicyFacade {
}
}
private static FederationPolicyManager instantiatePolicyManager(
String newType) throws FederationPolicyInitializationException {
FederationPolicyManager federationPolicyManager = null;
try {
// create policy instance and set queue
Class c = Class.forName(newType);
federationPolicyManager = (FederationPolicyManager) c.newInstance();
} catch (ClassNotFoundException e) {
throw new FederationPolicyInitializationException(e);
} catch (InstantiationException e) {
throw new FederationPolicyInitializationException(e);
} catch (IllegalAccessException e) {
throw new FederationPolicyInitializationException(e);
}
return federationPolicyManager;
}
/**
* This method flushes all cached configurations and policies. This should be
* invoked if the facade remains activity after very large churn of queues in

View File

@ -24,7 +24,14 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
@ -38,20 +45,35 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.PreemptionContract;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager;
import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import org.apache.hadoop.yarn.util.AsyncCallback;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
* Extends the AbstractRequestInterceptor and provides an implementation for
* federation of YARN RM and scaling an application across multiple YARN
@ -69,6 +91,27 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
private ApplicationMasterProtocol homeRM;
private SubClusterId homeSubClusterId;
/**
* UAM pool for secondary sub-clusters (ones other than home sub-cluster),
* using subClusterId as uamId. One UAM is created per sub-cluster RM except
* the home RM.
*
* Creation and register of UAM in secondary sub-clusters happen on-demand,
* when AMRMProxy policy routes resource request to these sub-clusters for the
* first time. AM heart beats to them are also handled asynchronously for
* performance reasons.
*/
private UnmanagedAMPoolManager uamPool;
/** Thread pool used for asynchronous operations. */
private ExecutorService threadpool;
/**
* Stores the AllocateResponses that are received asynchronously from all the
* sub-cluster resource managers except the home RM.
*/
private Map<SubClusterId, List<AllocateResponse>> asyncResponseSink;
/**
* Used to keep track of the container Id and the sub cluster RM that created
* the container, so that we know which sub-cluster to forward later requests
@ -89,7 +132,17 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
*/
private RegisterApplicationMasterResponse amRegistrationResponse;
/** The proxy ugi used to talk to home RM. */
private FederationStateStoreFacade federationFacade;
private SubClusterResolver subClusterResolver;
/** The policy used to split requests among sub-clusters. */
private FederationAMRMProxyPolicy policyInterpreter;
/**
* The proxy ugi used to talk to home RM, loaded with the up-to-date AMRMToken
* issued by home RM.
*/
private UserGroupInformation appOwner;
/**
@ -97,6 +150,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
*/
public FederationInterceptor() {
this.containerIdToSubClusterIdMap = new ConcurrentHashMap<>();
this.asyncResponseSink = new ConcurrentHashMap<>();
this.threadpool = Executors.newCachedThreadPool();
this.uamPool = createUnmanagedAMPoolManager(this.threadpool);
this.amRegistrationRequest = null;
this.amRegistrationResponse = null;
}
@ -126,6 +183,15 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
this.homeSubClusterId =
SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
this.homeRM = createHomeRMProxy(appContext);
this.federationFacade = FederationStateStoreFacade.getInstance();
this.subClusterResolver = this.federationFacade.getSubClusterResolver();
// AMRMProxyPolicy will be initialized in registerApplicationMaster
this.policyInterpreter = null;
this.uamPool.init(conf);
this.uamPool.start();
}
/**
@ -202,7 +268,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
if (queue == null) {
LOG.warn("Received null queue for application "
+ getApplicationContext().getApplicationAttemptId().getApplicationId()
+ " from home subcluster. Will use default queue name "
+ " from home sub-cluster. Will use default queue name "
+ YarnConfiguration.DEFAULT_QUEUE_NAME
+ " for getting AMRMProxyPolicy");
} else {
@ -211,6 +277,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
+ " belongs to queue " + queue);
}
// Initialize the AMRMProxyPolicy
try {
this.policyInterpreter =
FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter,
getConf(), this.federationFacade, this.homeSubClusterId);
} catch (FederationPolicyInitializationException e) {
throw new YarnRuntimeException(e);
}
return this.amRegistrationResponse;
}
@ -221,6 +295,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
@Override
public AllocateResponse allocate(AllocateRequest request)
throws YarnException {
Preconditions.checkArgument(this.policyInterpreter != null,
"Allocate should be called after registerApplicationMaster");
try {
// Split the heart beat request into multiple requests, one for each
@ -228,12 +304,28 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
Map<SubClusterId, AllocateRequest> requests =
splitAllocateRequest(request);
// Send the requests to the secondary sub-cluster resource managers.
// These secondary requests are send asynchronously and the responses will
// be collected and merged with the home response. In addition, it also
// return the newly registered Unmanaged AMs.
Registrations newRegistrations =
sendRequestsToSecondaryResourceManagers(requests);
// Send the request to the home RM and get the response
AllocateResponse homeResponse = AMRMClientUtils.allocateWithReRegister(
requests.get(this.homeSubClusterId), this.homeRM,
this.amRegistrationRequest,
getApplicationContext().getApplicationAttemptId());
// Notify policy of home response
try {
this.policyInterpreter.notifyOfResponse(this.homeSubClusterId,
homeResponse);
} catch (YarnException e) {
LOG.warn("notifyOfResponse for policy failed for home sub-cluster "
+ this.homeSubClusterId, e);
}
// If the resource manager sent us a new token, add to the current user
if (homeResponse.getAMRMToken() != null) {
LOG.debug("Received new AMRMToken");
@ -244,6 +336,13 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
// Merge the responses from home and secondary sub-cluster RMs
homeResponse = mergeAllocateResponses(homeResponse);
// Merge the containers and NMTokens from the new registrations into
// the homeResponse.
if (!isNullOrEmpty(newRegistrations.getSuccessfulRegistrations())) {
homeResponse = mergeRegistrationResponses(homeResponse,
newRegistrations.getSuccessfulRegistrations());
}
// return the final response to the application master.
return homeResponse;
} catch (IOException ex) {
@ -261,10 +360,83 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
FinishApplicationMasterRequest request)
throws YarnException, IOException {
// TODO: consider adding batchFinishApplicationMaster in UAMPoolManager
boolean failedToUnRegister = false;
ExecutorCompletionService<FinishApplicationMasterResponseInfo> compSvc =
null;
// Application master is completing operation. Send the finish
// application master request to all the registered sub-cluster resource
// managers in parallel, wait for the responses and aggregate the results.
Set<String> subClusterIds = this.uamPool.getAllUAMIds();
if (subClusterIds.size() > 0) {
final FinishApplicationMasterRequest finishRequest = request;
compSvc =
new ExecutorCompletionService<FinishApplicationMasterResponseInfo>(
this.threadpool);
LOG.info("Sending finish application request to {} sub-cluster RMs",
subClusterIds.size());
for (final String subClusterId : subClusterIds) {
compSvc.submit(new Callable<FinishApplicationMasterResponseInfo>() {
@Override
public FinishApplicationMasterResponseInfo call() throws Exception {
LOG.info("Sending finish application request to RM {}",
subClusterId);
FinishApplicationMasterResponse uamResponse = null;
try {
uamResponse =
uamPool.finishApplicationMaster(subClusterId, finishRequest);
} catch (Throwable e) {
LOG.warn("Failed to finish unmanaged application master: "
+ "RM address: " + subClusterId + " ApplicationId: "
+ getApplicationContext().getApplicationAttemptId(), e);
}
return new FinishApplicationMasterResponseInfo(uamResponse,
subClusterId);
}
});
}
}
// While the finish application request is being processed
// asynchronously by other sub-cluster resource managers, send the same
// request to the home resource manager on this thread.
FinishApplicationMasterResponse homeResponse =
AMRMClientUtils.finishAMWithReRegister(request, this.homeRM,
this.amRegistrationRequest,
getApplicationContext().getApplicationAttemptId());
if (subClusterIds.size() > 0) {
// Wait for other sub-cluster resource managers to return the
// response and merge it with the home response
LOG.info(
"Waiting for finish application response from {} sub-cluster RMs",
subClusterIds.size());
for (int i = 0; i < subClusterIds.size(); ++i) {
try {
Future<FinishApplicationMasterResponseInfo> future = compSvc.take();
FinishApplicationMasterResponseInfo uamResponse = future.get();
if (LOG.isDebugEnabled()) {
LOG.debug("Received finish application response from RM: "
+ uamResponse.getSubClusterId());
}
if (uamResponse.getResponse() == null
|| !uamResponse.getResponse().getIsUnregistered()) {
failedToUnRegister = true;
}
} catch (Throwable e) {
failedToUnRegister = true;
LOG.warn("Failed to finish unmanaged application master: "
+ " ApplicationId: "
+ getApplicationContext().getApplicationAttemptId(), e);
}
}
}
if (failedToUnRegister) {
homeResponse.setIsUnregistered(false);
}
return homeResponse;
}
@ -281,9 +453,32 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
*/
@Override
public void shutdown() {
if (this.uamPool != null) {
this.uamPool.stop();
}
if (threadpool != null) {
try {
threadpool.shutdown();
} catch (Throwable ex) {
}
threadpool = null;
}
super.shutdown();
}
/**
* Create the UAM pool manager for secondary sub-clsuters. For unit test to
* override.
*
* @param threadPool the thread pool to use
* @return the UAM pool manager instance
*/
@VisibleForTesting
protected UnmanagedAMPoolManager createUnmanagedAMPoolManager(
ExecutorService threadPool) {
return new UnmanagedAMPoolManager(threadPool);
}
/**
* Returns instance of the ApplicationMasterProtocol proxy class that is used
* to connect to the Home resource manager.
@ -302,6 +497,23 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
}
private SubClusterId getSubClusterForNode(String nodeName) {
SubClusterId subClusterId = null;
try {
subClusterId = this.subClusterResolver.getSubClusterForNode(nodeName);
} catch (YarnException e) {
LOG.error("Failed to resolve sub-cluster for node " + nodeName
+ ", skipping this node", e);
return null;
}
if (subClusterId == null) {
LOG.error("Failed to resolve sub-cluster for node {}, skipping this node",
nodeName);
return null;
}
return subClusterId;
}
/**
* In federation, the heart beat request needs to be sent to all the sub
* clusters from which the AM has requested containers. This method splits the
@ -317,20 +529,39 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
findOrCreateAllocateRequestForSubCluster(this.homeSubClusterId, request,
requestMap);
// Create heart beat request instances for all other already registered
// sub-cluster resource managers
Set<String> subClusterIds = this.uamPool.getAllUAMIds();
for (String subClusterId : subClusterIds) {
findOrCreateAllocateRequestForSubCluster(
SubClusterId.newInstance(subClusterId), request, requestMap);
}
if (!isNullOrEmpty(request.getAskList())) {
AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
this.homeSubClusterId, request, requestMap);
newRequest.getAskList().addAll(request.getAskList());
// Ask the federation policy interpreter to split the ask list for
// sending it to all the sub-cluster resource managers.
Map<SubClusterId, List<ResourceRequest>> asks =
splitResourceRequests(request.getAskList());
// Add the askLists to the corresponding sub-cluster requests.
for (Entry<SubClusterId, List<ResourceRequest>> entry : asks.entrySet()) {
AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
entry.getKey(), request, requestMap);
newRequest.getAskList().addAll(entry.getValue());
}
}
if (request.getResourceBlacklistRequest() != null && !isNullOrEmpty(
request.getResourceBlacklistRequest().getBlacklistAdditions())) {
for (String resourceName : request.getResourceBlacklistRequest()
.getBlacklistAdditions()) {
AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
this.homeSubClusterId, request, requestMap);
newRequest.getResourceBlacklistRequest().getBlacklistAdditions()
.add(resourceName);
SubClusterId subClusterId = getSubClusterForNode(resourceName);
if (subClusterId != null) {
AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
subClusterId, request, requestMap);
newRequest.getResourceBlacklistRequest().getBlacklistAdditions()
.add(resourceName);
}
}
}
@ -338,10 +569,13 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
request.getResourceBlacklistRequest().getBlacklistRemovals())) {
for (String resourceName : request.getResourceBlacklistRequest()
.getBlacklistRemovals()) {
AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
this.homeSubClusterId, request, requestMap);
newRequest.getResourceBlacklistRequest().getBlacklistRemovals()
.add(resourceName);
SubClusterId subClusterId = getSubClusterForNode(resourceName);
if (subClusterId != null) {
AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
subClusterId, request, requestMap);
newRequest.getResourceBlacklistRequest().getBlacklistRemovals()
.add(resourceName);
}
}
}
@ -370,6 +604,174 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
return requestMap;
}
/**
* This methods sends the specified AllocateRequests to the appropriate
* sub-cluster resource managers.
*
* @param requests contains the heart beat requests to send to the resource
* manager keyed by the resource manager address
* @return the registration responses from the newly added sub-cluster
* resource managers
* @throws YarnException
* @throws IOException
*/
private Registrations sendRequestsToSecondaryResourceManagers(
Map<SubClusterId, AllocateRequest> requests)
throws YarnException, IOException {
// Create new UAM instances for the sub-cluster that we have not seen
// before
Registrations registrations = registerWithNewSubClusters(requests.keySet());
// Now that all the registrations are done, send the allocation request
// to the sub-cluster RMs using the Unmanaged application masters
// asynchronously and don't wait for the response. The responses will
// arrive asynchronously and will be added to the response sink. These
// responses will be sent to the application master in some future heart
// beat response.
for (Entry<SubClusterId, AllocateRequest> entry : requests.entrySet()) {
final SubClusterId subClusterId = entry.getKey();
if (subClusterId.equals(this.homeSubClusterId)) {
// Skip the request for the home sub-cluster resource manager.
// It will be handled separately in the allocate() method
continue;
}
if (!this.uamPool.hasUAMId(subClusterId.getId())) {
// TODO: This means that the registration for this sub-cluster RM
// failed. For now, we ignore the resource requests and continue
// but we need to fix this and handle this situation. One way would
// be to send the request to another RM by consulting the policy.
LOG.warn("Unmanaged AM registration not found for sub-cluster {}",
subClusterId);
continue;
}
this.uamPool.allocateAsync(subClusterId.getId(), entry.getValue(),
new AsyncCallback<AllocateResponse>() {
@Override
public void callback(AllocateResponse response) {
synchronized (asyncResponseSink) {
List<AllocateResponse> responses = null;
if (asyncResponseSink.containsKey(subClusterId)) {
responses = asyncResponseSink.get(subClusterId);
} else {
responses = new ArrayList<>();
asyncResponseSink.put(subClusterId, responses);
}
responses.add(response);
}
// Notify policy of secondary sub-cluster responses
try {
policyInterpreter.notifyOfResponse(subClusterId, response);
} catch (YarnException e) {
LOG.warn(
"notifyOfResponse for policy failed for home sub-cluster "
+ subClusterId,
e);
}
}
});
}
return registrations;
}
/**
* This method ensures that Unmanaged AMs are created for each of the
* specified sub-cluster specified in the input and registers with the
* corresponding resource managers.
*/
private Registrations registerWithNewSubClusters(
Set<SubClusterId> subClusterSet) throws IOException {
List<SubClusterId> failedRegistrations = new ArrayList<>();
Map<SubClusterId, RegisterApplicationMasterResponse>
successfulRegistrations = new HashMap<>();
// Check to see if there are any new sub-clusters in this request
// list and create and register Unmanaged AM instance for the new ones
List<String> newSubClusters = new ArrayList<>();
for (SubClusterId subClusterId : subClusterSet) {
if (!subClusterId.equals(this.homeSubClusterId)
&& !this.uamPool.hasUAMId(subClusterId.getId())) {
newSubClusters.add(subClusterId.getId());
}
}
if (newSubClusters.size() > 0) {
final RegisterApplicationMasterRequest registerRequest =
this.amRegistrationRequest;
final AMRMProxyApplicationContext appContext = getApplicationContext();
ExecutorCompletionService<RegisterApplicationMasterResponseInfo>
completionService = new ExecutorCompletionService<>(threadpool);
for (final String subClusterId : newSubClusters) {
completionService
.submit(new Callable<RegisterApplicationMasterResponseInfo>() {
@Override
public RegisterApplicationMasterResponseInfo call()
throws Exception {
// Create a config loaded with federation on and subclusterId
// for each UAM
YarnConfiguration config = new YarnConfiguration(getConf());
FederationProxyProviderUtil.updateConfForFederation(config,
subClusterId);
RegisterApplicationMasterResponse uamResponse = null;
try {
// For appNameSuffix, use subClusterId of the home sub-cluster
uamResponse = uamPool.createAndRegisterNewUAM(subClusterId,
registerRequest, config,
appContext.getApplicationAttemptId().getApplicationId(),
amRegistrationResponse.getQueue(), appContext.getUser(),
homeSubClusterId.toString());
} catch (Throwable e) {
LOG.error("Failed to register application master: "
+ subClusterId + " Application: "
+ appContext.getApplicationAttemptId(), e);
}
return new RegisterApplicationMasterResponseInfo(uamResponse,
SubClusterId.newInstance(subClusterId));
}
});
}
// Wait for other sub-cluster resource managers to return the
// response and add it to the Map for returning to the caller
for (int i = 0; i < newSubClusters.size(); ++i) {
try {
Future<RegisterApplicationMasterResponseInfo> future =
completionService.take();
RegisterApplicationMasterResponseInfo uamResponse = future.get();
if (LOG.isDebugEnabled()) {
LOG.debug("Received register application response from RM: "
+ uamResponse.getSubClusterId());
}
if (uamResponse.getResponse() == null) {
failedRegistrations.add(uamResponse.getSubClusterId());
} else {
LOG.info("Successfully registered unmanaged application master: "
+ uamResponse.getSubClusterId() + " ApplicationId: "
+ getApplicationContext().getApplicationAttemptId());
successfulRegistrations.put(uamResponse.getSubClusterId(),
uamResponse.getResponse());
}
} catch (Exception e) {
LOG.warn("Failed to register unmanaged application master: "
+ " ApplicationId: "
+ getApplicationContext().getApplicationAttemptId(), e);
}
}
}
return new Registrations(successfulRegistrations, failedRegistrations);
}
/**
* Merges the responses from other sub-clusters that we received
* asynchronously with the specified home cluster response and keeps track of
@ -388,6 +790,24 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
cacheAllocatedContainers(homeResponse.getAllocatedContainers(),
this.homeSubClusterId);
synchronized (this.asyncResponseSink) {
for (Entry<SubClusterId, List<AllocateResponse>> entry : asyncResponseSink
.entrySet()) {
SubClusterId subClusterId = entry.getKey();
List<AllocateResponse> responses = entry.getValue();
if (responses.size() > 0) {
for (AllocateResponse response : responses) {
removeFinishedContainersFromCache(
response.getCompletedContainersStatuses());
cacheAllocatedContainers(response.getAllocatedContainers(),
subClusterId);
mergeAllocateResponse(homeResponse, response, subClusterId);
}
responses.clear();
}
}
}
return homeResponse;
}
@ -404,6 +824,130 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
}
/**
* Helper method for merging the responses from the secondary sub cluster RMs
* with the home response to return to the AM.
*/
private AllocateResponse mergeRegistrationResponses(
AllocateResponse homeResponse,
Map<SubClusterId, RegisterApplicationMasterResponse> registrations) {
for (Entry<SubClusterId, RegisterApplicationMasterResponse> entry :
registrations.entrySet()) {
RegisterApplicationMasterResponse registration = entry.getValue();
if (!isNullOrEmpty(registration.getContainersFromPreviousAttempts())) {
List<Container> tempContainers = homeResponse.getAllocatedContainers();
if (!isNullOrEmpty(tempContainers)) {
tempContainers
.addAll(registration.getContainersFromPreviousAttempts());
homeResponse.setAllocatedContainers(tempContainers);
} else {
homeResponse.setAllocatedContainers(
registration.getContainersFromPreviousAttempts());
}
cacheAllocatedContainers(
registration.getContainersFromPreviousAttempts(), entry.getKey());
}
if (!isNullOrEmpty(registration.getNMTokensFromPreviousAttempts())) {
List<NMToken> tempTokens = homeResponse.getNMTokens();
if (!isNullOrEmpty(tempTokens)) {
tempTokens.addAll(registration.getNMTokensFromPreviousAttempts());
homeResponse.setNMTokens(tempTokens);
} else {
homeResponse
.setNMTokens(registration.getNMTokensFromPreviousAttempts());
}
}
}
return homeResponse;
}
private void mergeAllocateResponse(AllocateResponse homeResponse,
AllocateResponse otherResponse, SubClusterId otherRMAddress) {
if (!isNullOrEmpty(otherResponse.getAllocatedContainers())) {
if (!isNullOrEmpty(homeResponse.getAllocatedContainers())) {
homeResponse.getAllocatedContainers()
.addAll(otherResponse.getAllocatedContainers());
} else {
homeResponse
.setAllocatedContainers(otherResponse.getAllocatedContainers());
}
}
if (otherResponse.getAvailableResources() != null) {
if (homeResponse.getAvailableResources() != null) {
homeResponse.setAvailableResources(
Resources.add(homeResponse.getAvailableResources(),
otherResponse.getAvailableResources()));
} else {
homeResponse
.setAvailableResources(otherResponse.getAvailableResources());
}
}
if (!isNullOrEmpty(otherResponse.getCompletedContainersStatuses())) {
if (!isNullOrEmpty(homeResponse.getCompletedContainersStatuses())) {
homeResponse.getCompletedContainersStatuses()
.addAll(otherResponse.getCompletedContainersStatuses());
} else {
homeResponse.setCompletedContainersStatuses(
otherResponse.getCompletedContainersStatuses());
}
}
if (!isNullOrEmpty(otherResponse.getUpdatedNodes())) {
if (!isNullOrEmpty(homeResponse.getUpdatedNodes())) {
homeResponse.getUpdatedNodes().addAll(otherResponse.getUpdatedNodes());
} else {
homeResponse.setUpdatedNodes(otherResponse.getUpdatedNodes());
}
}
if (!isNullOrEmpty(otherResponse.getNMTokens())) {
if (!isNullOrEmpty(homeResponse.getNMTokens())) {
homeResponse.getNMTokens().addAll(otherResponse.getNMTokens());
} else {
homeResponse.setNMTokens(otherResponse.getNMTokens());
}
}
PreemptionMessage homePreempMessage = homeResponse.getPreemptionMessage();
PreemptionMessage otherPreempMessage = otherResponse.getPreemptionMessage();
if (homePreempMessage == null && otherPreempMessage != null) {
homeResponse.setPreemptionMessage(otherPreempMessage);
}
if (homePreempMessage != null && otherPreempMessage != null) {
PreemptionContract par1 = homePreempMessage.getContract();
PreemptionContract par2 = otherPreempMessage.getContract();
if (par1 == null && par2 != null) {
homePreempMessage.setContract(par2);
}
if (par1 != null && par2 != null) {
par1.getResourceRequest().addAll(par2.getResourceRequest());
par2.getContainers().addAll(par2.getContainers());
}
StrictPreemptionContract spar1 = homePreempMessage.getStrictContract();
StrictPreemptionContract spar2 = otherPreempMessage.getStrictContract();
if (spar1 == null && spar2 != null) {
homePreempMessage.setStrictContract(spar2);
}
if (spar1 != null && spar2 != null) {
spar1.getContainers().addAll(spar2.getContainers());
}
}
}
/**
* Add allocated containers to cache mapping.
*/
@ -418,10 +962,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
// container allocation more than once. Just move on in this case.
LOG.warn(
"Duplicate containerID: {} found in the allocated containers"
+ " from same subcluster: {}, so ignoring.",
+ " from same sub-cluster: {}, so ignoring.",
container.getId(), subClusterId);
} else {
// The same container allocation from different subclusters,
// The same container allocation from different sub-clusters,
// something is wrong.
// TODO: YARN-6667 if some subcluster RM is configured wrong, we
// should not fail the entire heartbeat.
@ -432,7 +976,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
+ " ApplicationId: "
+ getApplicationContext().getApplicationAttemptId()
+ " From RM: " + subClusterId
+ " . Previous container was from subcluster: "
+ " . Previous container was from sub-cluster: "
+ existingSubClusterId);
}
}
@ -498,7 +1042,102 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
/**
* Utility method to check if the specified Collection is null or empty
* Splits the specified request to send it to different sub clusters. The
* splitting algorithm is very simple. If the request does not have a node
* preference, the policy decides the sub cluster. If the request has a node
* preference and if locality is required, then it is sent to the sub cluster
* that contains the requested node. If node preference is specified and
* locality is not required, then the policy decides the sub cluster.
*
* @param askList the ask list to split
* @return the split asks
* @throws YarnException if split fails
*/
protected Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
List<ResourceRequest> askList) throws YarnException {
return this.policyInterpreter.splitResourceRequests(askList);
}
@VisibleForTesting
public int getUnmanagedAMPoolSize() {
return this.uamPool.getAllUAMIds().size();
}
/**
* Private structure for encapsulating SubClusterId and
* RegisterApplicationMasterResponse instances.
*/
private static class RegisterApplicationMasterResponseInfo {
private RegisterApplicationMasterResponse response;
private SubClusterId subClusterId;
RegisterApplicationMasterResponseInfo(
RegisterApplicationMasterResponse response, SubClusterId subClusterId) {
this.response = response;
this.subClusterId = subClusterId;
}
public RegisterApplicationMasterResponse getResponse() {
return response;
}
public SubClusterId getSubClusterId() {
return subClusterId;
}
}
/**
* Private structure for encapsulating SubClusterId and
* FinishApplicationMasterResponse instances.
*/
private static class FinishApplicationMasterResponseInfo {
private FinishApplicationMasterResponse response;
private String subClusterId;
FinishApplicationMasterResponseInfo(
FinishApplicationMasterResponse response, String subClusterId) {
this.response = response;
this.subClusterId = subClusterId;
}
public FinishApplicationMasterResponse getResponse() {
return response;
}
public String getSubClusterId() {
return subClusterId;
}
}
/**
* Private structure for encapsulating successful and failed application
* master registration responses.
*/
private static class Registrations {
private Map<SubClusterId, RegisterApplicationMasterResponse>
successfulRegistrations;
private List<SubClusterId> failedRegistrations;
Registrations(
Map<SubClusterId, RegisterApplicationMasterResponse>
successfulRegistrations,
List<SubClusterId> failedRegistrations) {
this.successfulRegistrations = successfulRegistrations;
this.failedRegistrations = failedRegistrations;
}
public Map<SubClusterId, RegisterApplicationMasterResponse>
getSuccessfulRegistrations() {
return this.successfulRegistrations;
}
public List<SubClusterId> getFailedRegistrations() {
return this.failedRegistrations;
}
}
/**
* Utility method to check if the specified Collection is null or empty.
*
* @param c the collection object
* @param <T> element type of the collection
@ -507,4 +1146,16 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
public static <T> boolean isNullOrEmpty(Collection<T> c) {
return (c == null || c.size() == 0);
}
/**
* Utility method to check if the specified Collection is null or empty.
*
* @param c the map object
* @param <T1> key type of the map
* @param <T2> value type of the map
* @return whether is it is null or empty
*/
public static <T1, T2> boolean isNullOrEmpty(Map<T1, T2> c) {
return (c == null || c.size() == 0);
}
}

View File

@ -19,13 +19,31 @@
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Test;
@ -45,6 +63,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
public static final String HOME_SC_ID = "SC-home";
private TestableFederationInterceptor interceptor;
private MemoryFederationStateStore stateStore;
private int testAppId;
private ApplicationAttemptId attemptId;
@ -54,6 +73,11 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
super.setUp();
interceptor = new TestableFederationInterceptor();
stateStore = new MemoryFederationStateStore();
stateStore.init(getConf());
FederationStateStoreFacade.getInstance().reinitialize(stateStore,
getConf());
testAppId = 1;
attemptId = getApplicationAttemptId(testAppId);
interceptor.init(new AMRMProxyApplicationContextImpl(null, getConf(),
@ -82,11 +106,238 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
+ "," + TestableFederationInterceptor.class.getName());
conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER,
UniformBroadcastPolicyManager.class.getName());
conf.set(YarnConfiguration.RM_CLUSTER_ID, HOME_SC_ID);
// Disable StateStoreFacade cache
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
return conf;
}
private void registerSubCluster(SubClusterId subClusterId)
throws YarnException {
stateStore
.registerSubCluster(SubClusterRegisterRequest.newInstance(SubClusterInfo
.newInstance(subClusterId, "1.2.3.4:1", "1.2.3.4:2", "1.2.3.4:3",
"1.2.3.4:4", SubClusterState.SC_RUNNING, 0, "capacity")));
}
private void deRegisterSubCluster(SubClusterId subClusterId)
throws YarnException {
stateStore.deregisterSubCluster(SubClusterDeregisterRequest
.newInstance(subClusterId, SubClusterState.SC_UNREGISTERED));
}
private List<Container> getContainersAndAssert(int numberOfResourceRequests,
int numberOfAllocationExcepted) throws Exception {
AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
allocateRequest.setResponseId(1);
List<Container> containers =
new ArrayList<Container>(numberOfResourceRequests);
List<ResourceRequest> askList =
new ArrayList<ResourceRequest>(numberOfResourceRequests);
for (int id = 0; id < numberOfResourceRequests; id++) {
askList.add(createResourceRequest("test-node-" + Integer.toString(id),
6000, 2, id % 5, 1));
}
allocateRequest.setAskList(askList);
AllocateResponse allocateResponse = interceptor.allocate(allocateRequest);
Assert.assertNotNull("allocate() returned null response", allocateResponse);
containers.addAll(allocateResponse.getAllocatedContainers());
LOG.info("Number of allocated containers in the original request: "
+ Integer.toString(allocateResponse.getAllocatedContainers().size()));
// Send max 10 heart beats to receive all the containers. If not, we will
// fail the test
int numHeartbeat = 0;
while (containers.size() < numberOfAllocationExcepted
&& numHeartbeat++ < 10) {
allocateResponse =
interceptor.allocate(Records.newRecord(AllocateRequest.class));
Assert.assertNotNull("allocate() returned null response",
allocateResponse);
containers.addAll(allocateResponse.getAllocatedContainers());
LOG.info("Number of allocated containers in this request: "
+ Integer.toString(allocateResponse.getAllocatedContainers().size()));
LOG.info("Total number of allocated containers: "
+ Integer.toString(containers.size()));
Thread.sleep(10);
}
Assert.assertEquals(numberOfAllocationExcepted, containers.size());
return containers;
}
private void releaseContainersAndAssert(List<Container> containers)
throws Exception {
Assert.assertTrue(containers.size() > 0);
AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
allocateRequest.setResponseId(1);
List<ContainerId> relList = new ArrayList<ContainerId>(containers.size());
for (Container container : containers) {
relList.add(container.getId());
}
allocateRequest.setReleaseList(relList);
AllocateResponse allocateResponse = interceptor.allocate(allocateRequest);
Assert.assertNotNull(allocateResponse);
// The way the mock resource manager is setup, it will return the containers
// that were released in the allocated containers. The release request will
// be split and handled by the corresponding UAM. The release containers
// returned by the mock resource managers will be aggregated and returned
// back to us and we can check if total request size and returned size are
// the same
List<Container> containersForReleasedContainerIds =
new ArrayList<Container>();
containersForReleasedContainerIds
.addAll(allocateResponse.getAllocatedContainers());
LOG.info("Number of containers received in the original request: "
+ Integer.toString(allocateResponse.getAllocatedContainers().size()));
// Send max 10 heart beats to receive all the containers. If not, we will
// fail the test
int numHeartbeat = 0;
while (containersForReleasedContainerIds.size() < relList.size()
&& numHeartbeat++ < 10) {
allocateResponse =
interceptor.allocate(Records.newRecord(AllocateRequest.class));
Assert.assertNotNull(allocateResponse);
containersForReleasedContainerIds
.addAll(allocateResponse.getAllocatedContainers());
LOG.info("Number of containers received in this request: "
+ Integer.toString(allocateResponse.getAllocatedContainers().size()));
LOG.info("Total number of containers received: "
+ Integer.toString(containersForReleasedContainerIds.size()));
Thread.sleep(10);
}
Assert.assertEquals(relList.size(),
containersForReleasedContainerIds.size());
}
@Test
public void testMultipleSubClusters() throws Exception {
// Register the application
RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
registerReq.setRpcPort(testAppId);
registerReq.setTrackingUrl("");
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse);
Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
// Allocate the first batch of containers, with sc1 and sc2 active
registerSubCluster(SubClusterId.newInstance("SC-1"));
registerSubCluster(SubClusterId.newInstance("SC-2"));
int numberOfContainers = 3;
List<Container> containers =
getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize());
// Allocate the second batch of containers, with sc1 and sc3 active
deRegisterSubCluster(SubClusterId.newInstance("SC-2"));
registerSubCluster(SubClusterId.newInstance("SC-3"));
numberOfContainers = 1;
containers.addAll(
getContainersAndAssert(numberOfContainers, numberOfContainers * 2));
Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize());
// Allocate the third batch of containers with only in home sub-cluster
// active
deRegisterSubCluster(SubClusterId.newInstance("SC-1"));
deRegisterSubCluster(SubClusterId.newInstance("SC-3"));
registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
numberOfContainers = 2;
containers.addAll(
getContainersAndAssert(numberOfContainers, numberOfContainers * 1));
Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize());
// Release all containers
releaseContainersAndAssert(containers);
// Finish the application
FinishApplicationMasterRequest finishReq =
Records.newRecord(FinishApplicationMasterRequest.class);
finishReq.setDiagnostics("");
finishReq.setTrackingUrl("");
finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
FinishApplicationMasterResponse finshResponse =
interceptor.finishApplicationMaster(finishReq);
Assert.assertNotNull(finshResponse);
Assert.assertEquals(true, finshResponse.getIsUnregistered());
}
/*
* Test re-register when RM fails over.
*/
@Test
public void testReregister() throws Exception {
// Register the application
RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
registerReq.setRpcPort(testAppId);
registerReq.setTrackingUrl("");
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse);
Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
// Allocate the first batch of containers
registerSubCluster(SubClusterId.newInstance("SC-1"));
registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
interceptor.setShouldReRegisterNext();
int numberOfContainers = 3;
List<Container> containers =
getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
interceptor.setShouldReRegisterNext();
// Release all containers
releaseContainersAndAssert(containers);
interceptor.setShouldReRegisterNext();
// Finish the application
FinishApplicationMasterRequest finishReq =
Records.newRecord(FinishApplicationMasterRequest.class);
finishReq.setDiagnostics("");
finishReq.setTrackingUrl("");
finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
FinishApplicationMasterResponse finshResponse =
interceptor.finishApplicationMaster(finishReq);
Assert.assertNotNull(finshResponse);
Assert.assertEquals(true, finshResponse.getIsUnregistered());
}
@Test
public void testRequestInterceptorChainCreation() throws Exception {
RequestInterceptor root =

View File

@ -44,6 +44,12 @@ public class TestableFederationInterceptor extends FederationInterceptor {
private AtomicInteger runningIndex = new AtomicInteger(0);
private MockResourceManagerFacade mockRm;
@Override
protected UnmanagedAMPoolManager createUnmanagedAMPoolManager(
ExecutorService threadPool) {
return new TestableUnmanagedAMPoolManager(threadPool);
}
@Override
protected ApplicationMasterProtocol createHomeRMProxy(
AMRMProxyApplicationContext appContext) {