YARN-6511. Federation: transparently spanning application across multiple sub-clusters. (Botong Huang via Subru).
(cherry picked from commit8c988d235e
) (cherry picked from commit70b1a757f1
)
This commit is contained in:
parent
7ede8c1a53
commit
35a38330ef
|
@ -0,0 +1,168 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.federation.policies;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
|
||||
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Utility class for Federation policy.
|
||||
*/
|
||||
@Private
|
||||
public final class FederationPolicyUtils {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(FederationPolicyUtils.class);
|
||||
|
||||
/** Disable constructor. */
|
||||
private FederationPolicyUtils() {
|
||||
}
|
||||
|
||||
/**
|
||||
* A utilize method to instantiate a policy manager class given the type
|
||||
* (class name) from {@link SubClusterPolicyConfiguration}.
|
||||
*
|
||||
* @param newType class name of the policy manager to create
|
||||
* @return Policy manager
|
||||
* @throws FederationPolicyInitializationException if fails
|
||||
*/
|
||||
public static FederationPolicyManager instantiatePolicyManager(String newType)
|
||||
throws FederationPolicyInitializationException {
|
||||
FederationPolicyManager federationPolicyManager = null;
|
||||
try {
|
||||
// create policy instance and set queue
|
||||
Class<?> c = Class.forName(newType);
|
||||
federationPolicyManager = (FederationPolicyManager) c.newInstance();
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new FederationPolicyInitializationException(e);
|
||||
} catch (InstantiationException e) {
|
||||
throw new FederationPolicyInitializationException(e);
|
||||
} catch (IllegalAccessException e) {
|
||||
throw new FederationPolicyInitializationException(e);
|
||||
}
|
||||
return federationPolicyManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Federation policy configuration from state store, using default queue
|
||||
* and configuration as fallback.
|
||||
*
|
||||
* @param queue the queue of the application
|
||||
* @param conf the Yarn configuration
|
||||
* @param federationFacade state store facade
|
||||
* @return SubClusterPolicyConfiguration recreated
|
||||
*/
|
||||
public static SubClusterPolicyConfiguration loadPolicyConfiguration(
|
||||
String queue, Configuration conf,
|
||||
FederationStateStoreFacade federationFacade) {
|
||||
|
||||
// The facade might cache this request, based on its parameterization
|
||||
SubClusterPolicyConfiguration configuration = null;
|
||||
if (queue != null) {
|
||||
try {
|
||||
configuration = federationFacade.getPolicyConfiguration(queue);
|
||||
} catch (YarnException e) {
|
||||
LOG.warn("Failed to get policy from FederationFacade with queue "
|
||||
+ queue + ": " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
// If there is no policy configured for this queue, fallback to the baseline
|
||||
// policy that is configured either in the store or via XML config
|
||||
if (configuration == null) {
|
||||
LOG.info("No policy configured for queue {} in StateStore,"
|
||||
+ " fallback to default queue", queue);
|
||||
queue = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
|
||||
try {
|
||||
configuration = federationFacade.getPolicyConfiguration(queue);
|
||||
} catch (YarnException e) {
|
||||
LOG.warn("No fallback behavior defined in store, defaulting to XML "
|
||||
+ "configuration fallback behavior.");
|
||||
}
|
||||
}
|
||||
|
||||
// or from XML conf otherwise.
|
||||
if (configuration == null) {
|
||||
LOG.info("No policy configured for default queue {} in StateStore,"
|
||||
+ " fallback to local config", queue);
|
||||
|
||||
String defaultFederationPolicyManager =
|
||||
conf.get(YarnConfiguration.FEDERATION_POLICY_MANAGER,
|
||||
YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER);
|
||||
String defaultPolicyParamString =
|
||||
conf.get(YarnConfiguration.FEDERATION_POLICY_MANAGER_PARAMS,
|
||||
YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS);
|
||||
ByteBuffer defaultPolicyParam = ByteBuffer
|
||||
.wrap(defaultPolicyParamString.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
configuration = SubClusterPolicyConfiguration.newInstance(queue,
|
||||
defaultFederationPolicyManager, defaultPolicyParam);
|
||||
}
|
||||
return configuration;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get AMRMProxy policy from state store, using default queue and
|
||||
* configuration as fallback.
|
||||
*
|
||||
* @param queue the queue of the application
|
||||
* @param oldPolicy the previous policy instance (can be null)
|
||||
* @param conf the Yarn configuration
|
||||
* @param federationFacade state store facade
|
||||
* @param homeSubClusterId home sub-cluster id
|
||||
* @return FederationAMRMProxyPolicy recreated
|
||||
* @throws FederationPolicyInitializationException if fails
|
||||
*/
|
||||
public static FederationAMRMProxyPolicy loadAMRMPolicy(String queue,
|
||||
FederationAMRMProxyPolicy oldPolicy, Configuration conf,
|
||||
FederationStateStoreFacade federationFacade,
|
||||
SubClusterId homeSubClusterId)
|
||||
throws FederationPolicyInitializationException {
|
||||
|
||||
// Local policy and its configuration
|
||||
SubClusterPolicyConfiguration configuration =
|
||||
loadPolicyConfiguration(queue, conf, federationFacade);
|
||||
|
||||
// Instantiate the policyManager and get policy
|
||||
FederationPolicyInitializationContext context =
|
||||
new FederationPolicyInitializationContext(configuration,
|
||||
federationFacade.getSubClusterResolver(), federationFacade,
|
||||
homeSubClusterId);
|
||||
|
||||
LOG.info("Creating policy manager of type: " + configuration.getType());
|
||||
FederationPolicyManager federationPolicyManager =
|
||||
instantiatePolicyManager(configuration.getType());
|
||||
// set queue, reinit policy if required (implementation lazily check
|
||||
// content of conf), and cache it
|
||||
federationPolicyManager.setQueue(configuration.getQueue());
|
||||
return federationPolicyManager.getAMRMPolicy(context, oldPolicy);
|
||||
}
|
||||
|
||||
}
|
|
@ -95,7 +95,7 @@ public class RouterPolicyFacade {
|
|||
new FederationPolicyInitializationContext(configuration,
|
||||
subClusterResolver, federationFacade, homeSubcluster);
|
||||
FederationPolicyManager fallbackPolicyManager =
|
||||
instantiatePolicyManager(configuration.getType());
|
||||
FederationPolicyUtils.instantiatePolicyManager(configuration.getType());
|
||||
fallbackPolicyManager.setQueue(defaulKey);
|
||||
|
||||
// add to the cache the fallback behavior
|
||||
|
@ -209,7 +209,7 @@ public class RouterPolicyFacade {
|
|||
FederationRouterPolicy routerPolicy = policyMap.get(queue);
|
||||
|
||||
FederationPolicyManager federationPolicyManager =
|
||||
instantiatePolicyManager(newType);
|
||||
FederationPolicyUtils.instantiatePolicyManager(newType);
|
||||
// set queue, reinit policy if required (implementation lazily check
|
||||
// content of conf), and cache it
|
||||
federationPolicyManager.setQueue(queue);
|
||||
|
@ -224,23 +224,6 @@ public class RouterPolicyFacade {
|
|||
}
|
||||
}
|
||||
|
||||
private static FederationPolicyManager instantiatePolicyManager(
|
||||
String newType) throws FederationPolicyInitializationException {
|
||||
FederationPolicyManager federationPolicyManager = null;
|
||||
try {
|
||||
// create policy instance and set queue
|
||||
Class c = Class.forName(newType);
|
||||
federationPolicyManager = (FederationPolicyManager) c.newInstance();
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new FederationPolicyInitializationException(e);
|
||||
} catch (InstantiationException e) {
|
||||
throw new FederationPolicyInitializationException(e);
|
||||
} catch (IllegalAccessException e) {
|
||||
throw new FederationPolicyInitializationException(e);
|
||||
}
|
||||
return federationPolicyManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method flushes all cached configurations and policies. This should be
|
||||
* invoked if the facade remains activity after very large churn of queues in
|
||||
|
|
|
@ -24,7 +24,14 @@ import java.util.Collection;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorCompletionService;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
@ -38,20 +45,35 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
|
|||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||
import org.apache.hadoop.yarn.api.records.PreemptionContract;
|
||||
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
|
||||
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
|
||||
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
||||
import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager;
|
||||
import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
|
||||
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
|
||||
import org.apache.hadoop.yarn.util.AsyncCallback;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* Extends the AbstractRequestInterceptor and provides an implementation for
|
||||
* federation of YARN RM and scaling an application across multiple YARN
|
||||
|
@ -69,6 +91,27 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
private ApplicationMasterProtocol homeRM;
|
||||
private SubClusterId homeSubClusterId;
|
||||
|
||||
/**
|
||||
* UAM pool for secondary sub-clusters (ones other than home sub-cluster),
|
||||
* using subClusterId as uamId. One UAM is created per sub-cluster RM except
|
||||
* the home RM.
|
||||
*
|
||||
* Creation and register of UAM in secondary sub-clusters happen on-demand,
|
||||
* when AMRMProxy policy routes resource request to these sub-clusters for the
|
||||
* first time. AM heart beats to them are also handled asynchronously for
|
||||
* performance reasons.
|
||||
*/
|
||||
private UnmanagedAMPoolManager uamPool;
|
||||
|
||||
/** Thread pool used for asynchronous operations. */
|
||||
private ExecutorService threadpool;
|
||||
|
||||
/**
|
||||
* Stores the AllocateResponses that are received asynchronously from all the
|
||||
* sub-cluster resource managers except the home RM.
|
||||
*/
|
||||
private Map<SubClusterId, List<AllocateResponse>> asyncResponseSink;
|
||||
|
||||
/**
|
||||
* Used to keep track of the container Id and the sub cluster RM that created
|
||||
* the container, so that we know which sub-cluster to forward later requests
|
||||
|
@ -89,7 +132,17 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
*/
|
||||
private RegisterApplicationMasterResponse amRegistrationResponse;
|
||||
|
||||
/** The proxy ugi used to talk to home RM. */
|
||||
private FederationStateStoreFacade federationFacade;
|
||||
|
||||
private SubClusterResolver subClusterResolver;
|
||||
|
||||
/** The policy used to split requests among sub-clusters. */
|
||||
private FederationAMRMProxyPolicy policyInterpreter;
|
||||
|
||||
/**
|
||||
* The proxy ugi used to talk to home RM, loaded with the up-to-date AMRMToken
|
||||
* issued by home RM.
|
||||
*/
|
||||
private UserGroupInformation appOwner;
|
||||
|
||||
/**
|
||||
|
@ -97,6 +150,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
*/
|
||||
public FederationInterceptor() {
|
||||
this.containerIdToSubClusterIdMap = new ConcurrentHashMap<>();
|
||||
this.asyncResponseSink = new ConcurrentHashMap<>();
|
||||
this.threadpool = Executors.newCachedThreadPool();
|
||||
this.uamPool = createUnmanagedAMPoolManager(this.threadpool);
|
||||
this.amRegistrationRequest = null;
|
||||
this.amRegistrationResponse = null;
|
||||
}
|
||||
|
||||
|
@ -126,6 +183,15 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
this.homeSubClusterId =
|
||||
SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
|
||||
this.homeRM = createHomeRMProxy(appContext);
|
||||
|
||||
this.federationFacade = FederationStateStoreFacade.getInstance();
|
||||
this.subClusterResolver = this.federationFacade.getSubClusterResolver();
|
||||
|
||||
// AMRMProxyPolicy will be initialized in registerApplicationMaster
|
||||
this.policyInterpreter = null;
|
||||
|
||||
this.uamPool.init(conf);
|
||||
this.uamPool.start();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -202,7 +268,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
if (queue == null) {
|
||||
LOG.warn("Received null queue for application "
|
||||
+ getApplicationContext().getApplicationAttemptId().getApplicationId()
|
||||
+ " from home subcluster. Will use default queue name "
|
||||
+ " from home sub-cluster. Will use default queue name "
|
||||
+ YarnConfiguration.DEFAULT_QUEUE_NAME
|
||||
+ " for getting AMRMProxyPolicy");
|
||||
} else {
|
||||
|
@ -211,6 +277,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
+ " belongs to queue " + queue);
|
||||
}
|
||||
|
||||
// Initialize the AMRMProxyPolicy
|
||||
try {
|
||||
this.policyInterpreter =
|
||||
FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter,
|
||||
getConf(), this.federationFacade, this.homeSubClusterId);
|
||||
} catch (FederationPolicyInitializationException e) {
|
||||
throw new YarnRuntimeException(e);
|
||||
}
|
||||
return this.amRegistrationResponse;
|
||||
}
|
||||
|
||||
|
@ -221,6 +295,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
@Override
|
||||
public AllocateResponse allocate(AllocateRequest request)
|
||||
throws YarnException {
|
||||
Preconditions.checkArgument(this.policyInterpreter != null,
|
||||
"Allocate should be called after registerApplicationMaster");
|
||||
|
||||
try {
|
||||
// Split the heart beat request into multiple requests, one for each
|
||||
|
@ -228,12 +304,28 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
Map<SubClusterId, AllocateRequest> requests =
|
||||
splitAllocateRequest(request);
|
||||
|
||||
// Send the requests to the secondary sub-cluster resource managers.
|
||||
// These secondary requests are send asynchronously and the responses will
|
||||
// be collected and merged with the home response. In addition, it also
|
||||
// return the newly registered Unmanaged AMs.
|
||||
Registrations newRegistrations =
|
||||
sendRequestsToSecondaryResourceManagers(requests);
|
||||
|
||||
// Send the request to the home RM and get the response
|
||||
AllocateResponse homeResponse = AMRMClientUtils.allocateWithReRegister(
|
||||
requests.get(this.homeSubClusterId), this.homeRM,
|
||||
this.amRegistrationRequest,
|
||||
getApplicationContext().getApplicationAttemptId());
|
||||
|
||||
// Notify policy of home response
|
||||
try {
|
||||
this.policyInterpreter.notifyOfResponse(this.homeSubClusterId,
|
||||
homeResponse);
|
||||
} catch (YarnException e) {
|
||||
LOG.warn("notifyOfResponse for policy failed for home sub-cluster "
|
||||
+ this.homeSubClusterId, e);
|
||||
}
|
||||
|
||||
// If the resource manager sent us a new token, add to the current user
|
||||
if (homeResponse.getAMRMToken() != null) {
|
||||
LOG.debug("Received new AMRMToken");
|
||||
|
@ -244,6 +336,13 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
// Merge the responses from home and secondary sub-cluster RMs
|
||||
homeResponse = mergeAllocateResponses(homeResponse);
|
||||
|
||||
// Merge the containers and NMTokens from the new registrations into
|
||||
// the homeResponse.
|
||||
if (!isNullOrEmpty(newRegistrations.getSuccessfulRegistrations())) {
|
||||
homeResponse = mergeRegistrationResponses(homeResponse,
|
||||
newRegistrations.getSuccessfulRegistrations());
|
||||
}
|
||||
|
||||
// return the final response to the application master.
|
||||
return homeResponse;
|
||||
} catch (IOException ex) {
|
||||
|
@ -261,10 +360,83 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
FinishApplicationMasterRequest request)
|
||||
throws YarnException, IOException {
|
||||
|
||||
// TODO: consider adding batchFinishApplicationMaster in UAMPoolManager
|
||||
boolean failedToUnRegister = false;
|
||||
ExecutorCompletionService<FinishApplicationMasterResponseInfo> compSvc =
|
||||
null;
|
||||
|
||||
// Application master is completing operation. Send the finish
|
||||
// application master request to all the registered sub-cluster resource
|
||||
// managers in parallel, wait for the responses and aggregate the results.
|
||||
Set<String> subClusterIds = this.uamPool.getAllUAMIds();
|
||||
if (subClusterIds.size() > 0) {
|
||||
final FinishApplicationMasterRequest finishRequest = request;
|
||||
compSvc =
|
||||
new ExecutorCompletionService<FinishApplicationMasterResponseInfo>(
|
||||
this.threadpool);
|
||||
|
||||
LOG.info("Sending finish application request to {} sub-cluster RMs",
|
||||
subClusterIds.size());
|
||||
for (final String subClusterId : subClusterIds) {
|
||||
compSvc.submit(new Callable<FinishApplicationMasterResponseInfo>() {
|
||||
@Override
|
||||
public FinishApplicationMasterResponseInfo call() throws Exception {
|
||||
LOG.info("Sending finish application request to RM {}",
|
||||
subClusterId);
|
||||
FinishApplicationMasterResponse uamResponse = null;
|
||||
try {
|
||||
uamResponse =
|
||||
uamPool.finishApplicationMaster(subClusterId, finishRequest);
|
||||
} catch (Throwable e) {
|
||||
LOG.warn("Failed to finish unmanaged application master: "
|
||||
+ "RM address: " + subClusterId + " ApplicationId: "
|
||||
+ getApplicationContext().getApplicationAttemptId(), e);
|
||||
}
|
||||
return new FinishApplicationMasterResponseInfo(uamResponse,
|
||||
subClusterId);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// While the finish application request is being processed
|
||||
// asynchronously by other sub-cluster resource managers, send the same
|
||||
// request to the home resource manager on this thread.
|
||||
FinishApplicationMasterResponse homeResponse =
|
||||
AMRMClientUtils.finishAMWithReRegister(request, this.homeRM,
|
||||
this.amRegistrationRequest,
|
||||
getApplicationContext().getApplicationAttemptId());
|
||||
|
||||
if (subClusterIds.size() > 0) {
|
||||
// Wait for other sub-cluster resource managers to return the
|
||||
// response and merge it with the home response
|
||||
LOG.info(
|
||||
"Waiting for finish application response from {} sub-cluster RMs",
|
||||
subClusterIds.size());
|
||||
for (int i = 0; i < subClusterIds.size(); ++i) {
|
||||
try {
|
||||
Future<FinishApplicationMasterResponseInfo> future = compSvc.take();
|
||||
FinishApplicationMasterResponseInfo uamResponse = future.get();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Received finish application response from RM: "
|
||||
+ uamResponse.getSubClusterId());
|
||||
}
|
||||
if (uamResponse.getResponse() == null
|
||||
|| !uamResponse.getResponse().getIsUnregistered()) {
|
||||
failedToUnRegister = true;
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
failedToUnRegister = true;
|
||||
LOG.warn("Failed to finish unmanaged application master: "
|
||||
+ " ApplicationId: "
|
||||
+ getApplicationContext().getApplicationAttemptId(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (failedToUnRegister) {
|
||||
homeResponse.setIsUnregistered(false);
|
||||
}
|
||||
return homeResponse;
|
||||
}
|
||||
|
||||
|
@ -281,9 +453,32 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
*/
|
||||
@Override
|
||||
public void shutdown() {
|
||||
if (this.uamPool != null) {
|
||||
this.uamPool.stop();
|
||||
}
|
||||
if (threadpool != null) {
|
||||
try {
|
||||
threadpool.shutdown();
|
||||
} catch (Throwable ex) {
|
||||
}
|
||||
threadpool = null;
|
||||
}
|
||||
super.shutdown();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the UAM pool manager for secondary sub-clsuters. For unit test to
|
||||
* override.
|
||||
*
|
||||
* @param threadPool the thread pool to use
|
||||
* @return the UAM pool manager instance
|
||||
*/
|
||||
@VisibleForTesting
|
||||
protected UnmanagedAMPoolManager createUnmanagedAMPoolManager(
|
||||
ExecutorService threadPool) {
|
||||
return new UnmanagedAMPoolManager(threadPool);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns instance of the ApplicationMasterProtocol proxy class that is used
|
||||
* to connect to the Home resource manager.
|
||||
|
@ -302,6 +497,23 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
}
|
||||
}
|
||||
|
||||
private SubClusterId getSubClusterForNode(String nodeName) {
|
||||
SubClusterId subClusterId = null;
|
||||
try {
|
||||
subClusterId = this.subClusterResolver.getSubClusterForNode(nodeName);
|
||||
} catch (YarnException e) {
|
||||
LOG.error("Failed to resolve sub-cluster for node " + nodeName
|
||||
+ ", skipping this node", e);
|
||||
return null;
|
||||
}
|
||||
if (subClusterId == null) {
|
||||
LOG.error("Failed to resolve sub-cluster for node {}, skipping this node",
|
||||
nodeName);
|
||||
return null;
|
||||
}
|
||||
return subClusterId;
|
||||
}
|
||||
|
||||
/**
|
||||
* In federation, the heart beat request needs to be sent to all the sub
|
||||
* clusters from which the AM has requested containers. This method splits the
|
||||
|
@ -317,20 +529,39 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
findOrCreateAllocateRequestForSubCluster(this.homeSubClusterId, request,
|
||||
requestMap);
|
||||
|
||||
// Create heart beat request instances for all other already registered
|
||||
// sub-cluster resource managers
|
||||
Set<String> subClusterIds = this.uamPool.getAllUAMIds();
|
||||
for (String subClusterId : subClusterIds) {
|
||||
findOrCreateAllocateRequestForSubCluster(
|
||||
SubClusterId.newInstance(subClusterId), request, requestMap);
|
||||
}
|
||||
|
||||
if (!isNullOrEmpty(request.getAskList())) {
|
||||
AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
|
||||
this.homeSubClusterId, request, requestMap);
|
||||
newRequest.getAskList().addAll(request.getAskList());
|
||||
// Ask the federation policy interpreter to split the ask list for
|
||||
// sending it to all the sub-cluster resource managers.
|
||||
Map<SubClusterId, List<ResourceRequest>> asks =
|
||||
splitResourceRequests(request.getAskList());
|
||||
|
||||
// Add the askLists to the corresponding sub-cluster requests.
|
||||
for (Entry<SubClusterId, List<ResourceRequest>> entry : asks.entrySet()) {
|
||||
AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
|
||||
entry.getKey(), request, requestMap);
|
||||
newRequest.getAskList().addAll(entry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
if (request.getResourceBlacklistRequest() != null && !isNullOrEmpty(
|
||||
request.getResourceBlacklistRequest().getBlacklistAdditions())) {
|
||||
for (String resourceName : request.getResourceBlacklistRequest()
|
||||
.getBlacklistAdditions()) {
|
||||
AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
|
||||
this.homeSubClusterId, request, requestMap);
|
||||
newRequest.getResourceBlacklistRequest().getBlacklistAdditions()
|
||||
.add(resourceName);
|
||||
SubClusterId subClusterId = getSubClusterForNode(resourceName);
|
||||
if (subClusterId != null) {
|
||||
AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
|
||||
subClusterId, request, requestMap);
|
||||
newRequest.getResourceBlacklistRequest().getBlacklistAdditions()
|
||||
.add(resourceName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -338,10 +569,13 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
request.getResourceBlacklistRequest().getBlacklistRemovals())) {
|
||||
for (String resourceName : request.getResourceBlacklistRequest()
|
||||
.getBlacklistRemovals()) {
|
||||
AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
|
||||
this.homeSubClusterId, request, requestMap);
|
||||
newRequest.getResourceBlacklistRequest().getBlacklistRemovals()
|
||||
.add(resourceName);
|
||||
SubClusterId subClusterId = getSubClusterForNode(resourceName);
|
||||
if (subClusterId != null) {
|
||||
AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
|
||||
subClusterId, request, requestMap);
|
||||
newRequest.getResourceBlacklistRequest().getBlacklistRemovals()
|
||||
.add(resourceName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -370,6 +604,174 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
return requestMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* This methods sends the specified AllocateRequests to the appropriate
|
||||
* sub-cluster resource managers.
|
||||
*
|
||||
* @param requests contains the heart beat requests to send to the resource
|
||||
* manager keyed by the resource manager address
|
||||
* @return the registration responses from the newly added sub-cluster
|
||||
* resource managers
|
||||
* @throws YarnException
|
||||
* @throws IOException
|
||||
*/
|
||||
private Registrations sendRequestsToSecondaryResourceManagers(
|
||||
Map<SubClusterId, AllocateRequest> requests)
|
||||
throws YarnException, IOException {
|
||||
|
||||
// Create new UAM instances for the sub-cluster that we have not seen
|
||||
// before
|
||||
Registrations registrations = registerWithNewSubClusters(requests.keySet());
|
||||
|
||||
// Now that all the registrations are done, send the allocation request
|
||||
// to the sub-cluster RMs using the Unmanaged application masters
|
||||
// asynchronously and don't wait for the response. The responses will
|
||||
// arrive asynchronously and will be added to the response sink. These
|
||||
// responses will be sent to the application master in some future heart
|
||||
// beat response.
|
||||
for (Entry<SubClusterId, AllocateRequest> entry : requests.entrySet()) {
|
||||
final SubClusterId subClusterId = entry.getKey();
|
||||
|
||||
if (subClusterId.equals(this.homeSubClusterId)) {
|
||||
// Skip the request for the home sub-cluster resource manager.
|
||||
// It will be handled separately in the allocate() method
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!this.uamPool.hasUAMId(subClusterId.getId())) {
|
||||
// TODO: This means that the registration for this sub-cluster RM
|
||||
// failed. For now, we ignore the resource requests and continue
|
||||
// but we need to fix this and handle this situation. One way would
|
||||
// be to send the request to another RM by consulting the policy.
|
||||
LOG.warn("Unmanaged AM registration not found for sub-cluster {}",
|
||||
subClusterId);
|
||||
continue;
|
||||
}
|
||||
|
||||
this.uamPool.allocateAsync(subClusterId.getId(), entry.getValue(),
|
||||
new AsyncCallback<AllocateResponse>() {
|
||||
@Override
|
||||
public void callback(AllocateResponse response) {
|
||||
synchronized (asyncResponseSink) {
|
||||
List<AllocateResponse> responses = null;
|
||||
if (asyncResponseSink.containsKey(subClusterId)) {
|
||||
responses = asyncResponseSink.get(subClusterId);
|
||||
} else {
|
||||
responses = new ArrayList<>();
|
||||
asyncResponseSink.put(subClusterId, responses);
|
||||
}
|
||||
responses.add(response);
|
||||
}
|
||||
|
||||
// Notify policy of secondary sub-cluster responses
|
||||
try {
|
||||
policyInterpreter.notifyOfResponse(subClusterId, response);
|
||||
} catch (YarnException e) {
|
||||
LOG.warn(
|
||||
"notifyOfResponse for policy failed for home sub-cluster "
|
||||
+ subClusterId,
|
||||
e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return registrations;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method ensures that Unmanaged AMs are created for each of the
|
||||
* specified sub-cluster specified in the input and registers with the
|
||||
* corresponding resource managers.
|
||||
*/
|
||||
private Registrations registerWithNewSubClusters(
|
||||
Set<SubClusterId> subClusterSet) throws IOException {
|
||||
|
||||
List<SubClusterId> failedRegistrations = new ArrayList<>();
|
||||
Map<SubClusterId, RegisterApplicationMasterResponse>
|
||||
successfulRegistrations = new HashMap<>();
|
||||
|
||||
// Check to see if there are any new sub-clusters in this request
|
||||
// list and create and register Unmanaged AM instance for the new ones
|
||||
List<String> newSubClusters = new ArrayList<>();
|
||||
for (SubClusterId subClusterId : subClusterSet) {
|
||||
if (!subClusterId.equals(this.homeSubClusterId)
|
||||
&& !this.uamPool.hasUAMId(subClusterId.getId())) {
|
||||
newSubClusters.add(subClusterId.getId());
|
||||
}
|
||||
}
|
||||
|
||||
if (newSubClusters.size() > 0) {
|
||||
final RegisterApplicationMasterRequest registerRequest =
|
||||
this.amRegistrationRequest;
|
||||
final AMRMProxyApplicationContext appContext = getApplicationContext();
|
||||
ExecutorCompletionService<RegisterApplicationMasterResponseInfo>
|
||||
completionService = new ExecutorCompletionService<>(threadpool);
|
||||
|
||||
for (final String subClusterId : newSubClusters) {
|
||||
completionService
|
||||
.submit(new Callable<RegisterApplicationMasterResponseInfo>() {
|
||||
@Override
|
||||
public RegisterApplicationMasterResponseInfo call()
|
||||
throws Exception {
|
||||
|
||||
// Create a config loaded with federation on and subclusterId
|
||||
// for each UAM
|
||||
YarnConfiguration config = new YarnConfiguration(getConf());
|
||||
FederationProxyProviderUtil.updateConfForFederation(config,
|
||||
subClusterId);
|
||||
|
||||
RegisterApplicationMasterResponse uamResponse = null;
|
||||
try {
|
||||
// For appNameSuffix, use subClusterId of the home sub-cluster
|
||||
uamResponse = uamPool.createAndRegisterNewUAM(subClusterId,
|
||||
registerRequest, config,
|
||||
appContext.getApplicationAttemptId().getApplicationId(),
|
||||
amRegistrationResponse.getQueue(), appContext.getUser(),
|
||||
homeSubClusterId.toString());
|
||||
} catch (Throwable e) {
|
||||
LOG.error("Failed to register application master: "
|
||||
+ subClusterId + " Application: "
|
||||
+ appContext.getApplicationAttemptId(), e);
|
||||
}
|
||||
return new RegisterApplicationMasterResponseInfo(uamResponse,
|
||||
SubClusterId.newInstance(subClusterId));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Wait for other sub-cluster resource managers to return the
|
||||
// response and add it to the Map for returning to the caller
|
||||
for (int i = 0; i < newSubClusters.size(); ++i) {
|
||||
try {
|
||||
Future<RegisterApplicationMasterResponseInfo> future =
|
||||
completionService.take();
|
||||
RegisterApplicationMasterResponseInfo uamResponse = future.get();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Received register application response from RM: "
|
||||
+ uamResponse.getSubClusterId());
|
||||
}
|
||||
|
||||
if (uamResponse.getResponse() == null) {
|
||||
failedRegistrations.add(uamResponse.getSubClusterId());
|
||||
} else {
|
||||
LOG.info("Successfully registered unmanaged application master: "
|
||||
+ uamResponse.getSubClusterId() + " ApplicationId: "
|
||||
+ getApplicationContext().getApplicationAttemptId());
|
||||
successfulRegistrations.put(uamResponse.getSubClusterId(),
|
||||
uamResponse.getResponse());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to register unmanaged application master: "
|
||||
+ " ApplicationId: "
|
||||
+ getApplicationContext().getApplicationAttemptId(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return new Registrations(successfulRegistrations, failedRegistrations);
|
||||
}
|
||||
|
||||
/**
|
||||
* Merges the responses from other sub-clusters that we received
|
||||
* asynchronously with the specified home cluster response and keeps track of
|
||||
|
@ -388,6 +790,24 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
cacheAllocatedContainers(homeResponse.getAllocatedContainers(),
|
||||
this.homeSubClusterId);
|
||||
|
||||
synchronized (this.asyncResponseSink) {
|
||||
for (Entry<SubClusterId, List<AllocateResponse>> entry : asyncResponseSink
|
||||
.entrySet()) {
|
||||
SubClusterId subClusterId = entry.getKey();
|
||||
List<AllocateResponse> responses = entry.getValue();
|
||||
if (responses.size() > 0) {
|
||||
for (AllocateResponse response : responses) {
|
||||
removeFinishedContainersFromCache(
|
||||
response.getCompletedContainersStatuses());
|
||||
cacheAllocatedContainers(response.getAllocatedContainers(),
|
||||
subClusterId);
|
||||
mergeAllocateResponse(homeResponse, response, subClusterId);
|
||||
}
|
||||
responses.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return homeResponse;
|
||||
}
|
||||
|
||||
|
@ -404,6 +824,130 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method for merging the responses from the secondary sub cluster RMs
|
||||
* with the home response to return to the AM.
|
||||
*/
|
||||
private AllocateResponse mergeRegistrationResponses(
|
||||
AllocateResponse homeResponse,
|
||||
Map<SubClusterId, RegisterApplicationMasterResponse> registrations) {
|
||||
|
||||
for (Entry<SubClusterId, RegisterApplicationMasterResponse> entry :
|
||||
registrations.entrySet()) {
|
||||
RegisterApplicationMasterResponse registration = entry.getValue();
|
||||
|
||||
if (!isNullOrEmpty(registration.getContainersFromPreviousAttempts())) {
|
||||
List<Container> tempContainers = homeResponse.getAllocatedContainers();
|
||||
if (!isNullOrEmpty(tempContainers)) {
|
||||
tempContainers
|
||||
.addAll(registration.getContainersFromPreviousAttempts());
|
||||
homeResponse.setAllocatedContainers(tempContainers);
|
||||
} else {
|
||||
homeResponse.setAllocatedContainers(
|
||||
registration.getContainersFromPreviousAttempts());
|
||||
}
|
||||
cacheAllocatedContainers(
|
||||
registration.getContainersFromPreviousAttempts(), entry.getKey());
|
||||
}
|
||||
|
||||
if (!isNullOrEmpty(registration.getNMTokensFromPreviousAttempts())) {
|
||||
List<NMToken> tempTokens = homeResponse.getNMTokens();
|
||||
if (!isNullOrEmpty(tempTokens)) {
|
||||
tempTokens.addAll(registration.getNMTokensFromPreviousAttempts());
|
||||
homeResponse.setNMTokens(tempTokens);
|
||||
} else {
|
||||
homeResponse
|
||||
.setNMTokens(registration.getNMTokensFromPreviousAttempts());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return homeResponse;
|
||||
}
|
||||
|
||||
private void mergeAllocateResponse(AllocateResponse homeResponse,
|
||||
AllocateResponse otherResponse, SubClusterId otherRMAddress) {
|
||||
|
||||
if (!isNullOrEmpty(otherResponse.getAllocatedContainers())) {
|
||||
if (!isNullOrEmpty(homeResponse.getAllocatedContainers())) {
|
||||
homeResponse.getAllocatedContainers()
|
||||
.addAll(otherResponse.getAllocatedContainers());
|
||||
} else {
|
||||
homeResponse
|
||||
.setAllocatedContainers(otherResponse.getAllocatedContainers());
|
||||
}
|
||||
}
|
||||
|
||||
if (otherResponse.getAvailableResources() != null) {
|
||||
if (homeResponse.getAvailableResources() != null) {
|
||||
homeResponse.setAvailableResources(
|
||||
Resources.add(homeResponse.getAvailableResources(),
|
||||
otherResponse.getAvailableResources()));
|
||||
} else {
|
||||
homeResponse
|
||||
.setAvailableResources(otherResponse.getAvailableResources());
|
||||
}
|
||||
}
|
||||
|
||||
if (!isNullOrEmpty(otherResponse.getCompletedContainersStatuses())) {
|
||||
if (!isNullOrEmpty(homeResponse.getCompletedContainersStatuses())) {
|
||||
homeResponse.getCompletedContainersStatuses()
|
||||
.addAll(otherResponse.getCompletedContainersStatuses());
|
||||
} else {
|
||||
homeResponse.setCompletedContainersStatuses(
|
||||
otherResponse.getCompletedContainersStatuses());
|
||||
}
|
||||
}
|
||||
|
||||
if (!isNullOrEmpty(otherResponse.getUpdatedNodes())) {
|
||||
if (!isNullOrEmpty(homeResponse.getUpdatedNodes())) {
|
||||
homeResponse.getUpdatedNodes().addAll(otherResponse.getUpdatedNodes());
|
||||
} else {
|
||||
homeResponse.setUpdatedNodes(otherResponse.getUpdatedNodes());
|
||||
}
|
||||
}
|
||||
|
||||
if (!isNullOrEmpty(otherResponse.getNMTokens())) {
|
||||
if (!isNullOrEmpty(homeResponse.getNMTokens())) {
|
||||
homeResponse.getNMTokens().addAll(otherResponse.getNMTokens());
|
||||
} else {
|
||||
homeResponse.setNMTokens(otherResponse.getNMTokens());
|
||||
}
|
||||
}
|
||||
|
||||
PreemptionMessage homePreempMessage = homeResponse.getPreemptionMessage();
|
||||
PreemptionMessage otherPreempMessage = otherResponse.getPreemptionMessage();
|
||||
|
||||
if (homePreempMessage == null && otherPreempMessage != null) {
|
||||
homeResponse.setPreemptionMessage(otherPreempMessage);
|
||||
}
|
||||
|
||||
if (homePreempMessage != null && otherPreempMessage != null) {
|
||||
PreemptionContract par1 = homePreempMessage.getContract();
|
||||
PreemptionContract par2 = otherPreempMessage.getContract();
|
||||
|
||||
if (par1 == null && par2 != null) {
|
||||
homePreempMessage.setContract(par2);
|
||||
}
|
||||
|
||||
if (par1 != null && par2 != null) {
|
||||
par1.getResourceRequest().addAll(par2.getResourceRequest());
|
||||
par2.getContainers().addAll(par2.getContainers());
|
||||
}
|
||||
|
||||
StrictPreemptionContract spar1 = homePreempMessage.getStrictContract();
|
||||
StrictPreemptionContract spar2 = otherPreempMessage.getStrictContract();
|
||||
|
||||
if (spar1 == null && spar2 != null) {
|
||||
homePreempMessage.setStrictContract(spar2);
|
||||
}
|
||||
|
||||
if (spar1 != null && spar2 != null) {
|
||||
spar1.getContainers().addAll(spar2.getContainers());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add allocated containers to cache mapping.
|
||||
*/
|
||||
|
@ -418,10 +962,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
// container allocation more than once. Just move on in this case.
|
||||
LOG.warn(
|
||||
"Duplicate containerID: {} found in the allocated containers"
|
||||
+ " from same subcluster: {}, so ignoring.",
|
||||
+ " from same sub-cluster: {}, so ignoring.",
|
||||
container.getId(), subClusterId);
|
||||
} else {
|
||||
// The same container allocation from different subclusters,
|
||||
// The same container allocation from different sub-clusters,
|
||||
// something is wrong.
|
||||
// TODO: YARN-6667 if some subcluster RM is configured wrong, we
|
||||
// should not fail the entire heartbeat.
|
||||
|
@ -432,7 +976,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
+ " ApplicationId: "
|
||||
+ getApplicationContext().getApplicationAttemptId()
|
||||
+ " From RM: " + subClusterId
|
||||
+ " . Previous container was from subcluster: "
|
||||
+ " . Previous container was from sub-cluster: "
|
||||
+ existingSubClusterId);
|
||||
}
|
||||
}
|
||||
|
@ -498,7 +1042,102 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
}
|
||||
|
||||
/**
|
||||
* Utility method to check if the specified Collection is null or empty
|
||||
* Splits the specified request to send it to different sub clusters. The
|
||||
* splitting algorithm is very simple. If the request does not have a node
|
||||
* preference, the policy decides the sub cluster. If the request has a node
|
||||
* preference and if locality is required, then it is sent to the sub cluster
|
||||
* that contains the requested node. If node preference is specified and
|
||||
* locality is not required, then the policy decides the sub cluster.
|
||||
*
|
||||
* @param askList the ask list to split
|
||||
* @return the split asks
|
||||
* @throws YarnException if split fails
|
||||
*/
|
||||
protected Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
|
||||
List<ResourceRequest> askList) throws YarnException {
|
||||
return this.policyInterpreter.splitResourceRequests(askList);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public int getUnmanagedAMPoolSize() {
|
||||
return this.uamPool.getAllUAMIds().size();
|
||||
}
|
||||
|
||||
/**
|
||||
* Private structure for encapsulating SubClusterId and
|
||||
* RegisterApplicationMasterResponse instances.
|
||||
*/
|
||||
private static class RegisterApplicationMasterResponseInfo {
|
||||
private RegisterApplicationMasterResponse response;
|
||||
private SubClusterId subClusterId;
|
||||
|
||||
RegisterApplicationMasterResponseInfo(
|
||||
RegisterApplicationMasterResponse response, SubClusterId subClusterId) {
|
||||
this.response = response;
|
||||
this.subClusterId = subClusterId;
|
||||
}
|
||||
|
||||
public RegisterApplicationMasterResponse getResponse() {
|
||||
return response;
|
||||
}
|
||||
|
||||
public SubClusterId getSubClusterId() {
|
||||
return subClusterId;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Private structure for encapsulating SubClusterId and
|
||||
* FinishApplicationMasterResponse instances.
|
||||
*/
|
||||
private static class FinishApplicationMasterResponseInfo {
|
||||
private FinishApplicationMasterResponse response;
|
||||
private String subClusterId;
|
||||
|
||||
FinishApplicationMasterResponseInfo(
|
||||
FinishApplicationMasterResponse response, String subClusterId) {
|
||||
this.response = response;
|
||||
this.subClusterId = subClusterId;
|
||||
}
|
||||
|
||||
public FinishApplicationMasterResponse getResponse() {
|
||||
return response;
|
||||
}
|
||||
|
||||
public String getSubClusterId() {
|
||||
return subClusterId;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Private structure for encapsulating successful and failed application
|
||||
* master registration responses.
|
||||
*/
|
||||
private static class Registrations {
|
||||
private Map<SubClusterId, RegisterApplicationMasterResponse>
|
||||
successfulRegistrations;
|
||||
private List<SubClusterId> failedRegistrations;
|
||||
|
||||
Registrations(
|
||||
Map<SubClusterId, RegisterApplicationMasterResponse>
|
||||
successfulRegistrations,
|
||||
List<SubClusterId> failedRegistrations) {
|
||||
this.successfulRegistrations = successfulRegistrations;
|
||||
this.failedRegistrations = failedRegistrations;
|
||||
}
|
||||
|
||||
public Map<SubClusterId, RegisterApplicationMasterResponse>
|
||||
getSuccessfulRegistrations() {
|
||||
return this.successfulRegistrations;
|
||||
}
|
||||
|
||||
public List<SubClusterId> getFailedRegistrations() {
|
||||
return this.failedRegistrations;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method to check if the specified Collection is null or empty.
|
||||
*
|
||||
* @param c the collection object
|
||||
* @param <T> element type of the collection
|
||||
|
@ -507,4 +1146,16 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
public static <T> boolean isNullOrEmpty(Collection<T> c) {
|
||||
return (c == null || c.size() == 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method to check if the specified Collection is null or empty.
|
||||
*
|
||||
* @param c the map object
|
||||
* @param <T1> key type of the map
|
||||
* @param <T2> value type of the map
|
||||
* @return whether is it is null or empty
|
||||
*/
|
||||
public static <T1, T2> boolean isNullOrEmpty(Map<T1, T2> c) {
|
||||
return (c == null || c.size() == 0);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,13 +19,31 @@
|
|||
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
|
||||
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
|
||||
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
@ -45,6 +63,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
|
|||
public static final String HOME_SC_ID = "SC-home";
|
||||
|
||||
private TestableFederationInterceptor interceptor;
|
||||
private MemoryFederationStateStore stateStore;
|
||||
|
||||
private int testAppId;
|
||||
private ApplicationAttemptId attemptId;
|
||||
|
@ -54,6 +73,11 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
|
|||
super.setUp();
|
||||
interceptor = new TestableFederationInterceptor();
|
||||
|
||||
stateStore = new MemoryFederationStateStore();
|
||||
stateStore.init(getConf());
|
||||
FederationStateStoreFacade.getInstance().reinitialize(stateStore,
|
||||
getConf());
|
||||
|
||||
testAppId = 1;
|
||||
attemptId = getApplicationAttemptId(testAppId);
|
||||
interceptor.init(new AMRMProxyApplicationContextImpl(null, getConf(),
|
||||
|
@ -82,11 +106,238 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
|
|||
mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
|
||||
+ "," + TestableFederationInterceptor.class.getName());
|
||||
|
||||
conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER,
|
||||
UniformBroadcastPolicyManager.class.getName());
|
||||
|
||||
conf.set(YarnConfiguration.RM_CLUSTER_ID, HOME_SC_ID);
|
||||
|
||||
// Disable StateStoreFacade cache
|
||||
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
|
||||
|
||||
return conf;
|
||||
}
|
||||
|
||||
private void registerSubCluster(SubClusterId subClusterId)
|
||||
throws YarnException {
|
||||
stateStore
|
||||
.registerSubCluster(SubClusterRegisterRequest.newInstance(SubClusterInfo
|
||||
.newInstance(subClusterId, "1.2.3.4:1", "1.2.3.4:2", "1.2.3.4:3",
|
||||
"1.2.3.4:4", SubClusterState.SC_RUNNING, 0, "capacity")));
|
||||
}
|
||||
|
||||
private void deRegisterSubCluster(SubClusterId subClusterId)
|
||||
throws YarnException {
|
||||
stateStore.deregisterSubCluster(SubClusterDeregisterRequest
|
||||
.newInstance(subClusterId, SubClusterState.SC_UNREGISTERED));
|
||||
}
|
||||
|
||||
private List<Container> getContainersAndAssert(int numberOfResourceRequests,
|
||||
int numberOfAllocationExcepted) throws Exception {
|
||||
AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
|
||||
allocateRequest.setResponseId(1);
|
||||
|
||||
List<Container> containers =
|
||||
new ArrayList<Container>(numberOfResourceRequests);
|
||||
List<ResourceRequest> askList =
|
||||
new ArrayList<ResourceRequest>(numberOfResourceRequests);
|
||||
for (int id = 0; id < numberOfResourceRequests; id++) {
|
||||
askList.add(createResourceRequest("test-node-" + Integer.toString(id),
|
||||
6000, 2, id % 5, 1));
|
||||
}
|
||||
|
||||
allocateRequest.setAskList(askList);
|
||||
|
||||
AllocateResponse allocateResponse = interceptor.allocate(allocateRequest);
|
||||
Assert.assertNotNull("allocate() returned null response", allocateResponse);
|
||||
|
||||
containers.addAll(allocateResponse.getAllocatedContainers());
|
||||
LOG.info("Number of allocated containers in the original request: "
|
||||
+ Integer.toString(allocateResponse.getAllocatedContainers().size()));
|
||||
|
||||
// Send max 10 heart beats to receive all the containers. If not, we will
|
||||
// fail the test
|
||||
int numHeartbeat = 0;
|
||||
while (containers.size() < numberOfAllocationExcepted
|
||||
&& numHeartbeat++ < 10) {
|
||||
allocateResponse =
|
||||
interceptor.allocate(Records.newRecord(AllocateRequest.class));
|
||||
Assert.assertNotNull("allocate() returned null response",
|
||||
allocateResponse);
|
||||
|
||||
containers.addAll(allocateResponse.getAllocatedContainers());
|
||||
|
||||
LOG.info("Number of allocated containers in this request: "
|
||||
+ Integer.toString(allocateResponse.getAllocatedContainers().size()));
|
||||
LOG.info("Total number of allocated containers: "
|
||||
+ Integer.toString(containers.size()));
|
||||
Thread.sleep(10);
|
||||
}
|
||||
Assert.assertEquals(numberOfAllocationExcepted, containers.size());
|
||||
return containers;
|
||||
}
|
||||
|
||||
private void releaseContainersAndAssert(List<Container> containers)
|
||||
throws Exception {
|
||||
Assert.assertTrue(containers.size() > 0);
|
||||
AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
|
||||
allocateRequest.setResponseId(1);
|
||||
|
||||
List<ContainerId> relList = new ArrayList<ContainerId>(containers.size());
|
||||
for (Container container : containers) {
|
||||
relList.add(container.getId());
|
||||
}
|
||||
|
||||
allocateRequest.setReleaseList(relList);
|
||||
|
||||
AllocateResponse allocateResponse = interceptor.allocate(allocateRequest);
|
||||
Assert.assertNotNull(allocateResponse);
|
||||
|
||||
// The way the mock resource manager is setup, it will return the containers
|
||||
// that were released in the allocated containers. The release request will
|
||||
// be split and handled by the corresponding UAM. The release containers
|
||||
// returned by the mock resource managers will be aggregated and returned
|
||||
// back to us and we can check if total request size and returned size are
|
||||
// the same
|
||||
List<Container> containersForReleasedContainerIds =
|
||||
new ArrayList<Container>();
|
||||
containersForReleasedContainerIds
|
||||
.addAll(allocateResponse.getAllocatedContainers());
|
||||
LOG.info("Number of containers received in the original request: "
|
||||
+ Integer.toString(allocateResponse.getAllocatedContainers().size()));
|
||||
|
||||
// Send max 10 heart beats to receive all the containers. If not, we will
|
||||
// fail the test
|
||||
int numHeartbeat = 0;
|
||||
while (containersForReleasedContainerIds.size() < relList.size()
|
||||
&& numHeartbeat++ < 10) {
|
||||
allocateResponse =
|
||||
interceptor.allocate(Records.newRecord(AllocateRequest.class));
|
||||
Assert.assertNotNull(allocateResponse);
|
||||
containersForReleasedContainerIds
|
||||
.addAll(allocateResponse.getAllocatedContainers());
|
||||
|
||||
LOG.info("Number of containers received in this request: "
|
||||
+ Integer.toString(allocateResponse.getAllocatedContainers().size()));
|
||||
LOG.info("Total number of containers received: "
|
||||
+ Integer.toString(containersForReleasedContainerIds.size()));
|
||||
Thread.sleep(10);
|
||||
}
|
||||
|
||||
Assert.assertEquals(relList.size(),
|
||||
containersForReleasedContainerIds.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleSubClusters() throws Exception {
|
||||
|
||||
// Register the application
|
||||
RegisterApplicationMasterRequest registerReq =
|
||||
Records.newRecord(RegisterApplicationMasterRequest.class);
|
||||
registerReq.setHost(Integer.toString(testAppId));
|
||||
registerReq.setRpcPort(testAppId);
|
||||
registerReq.setTrackingUrl("");
|
||||
|
||||
RegisterApplicationMasterResponse registerResponse =
|
||||
interceptor.registerApplicationMaster(registerReq);
|
||||
Assert.assertNotNull(registerResponse);
|
||||
|
||||
Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
|
||||
|
||||
// Allocate the first batch of containers, with sc1 and sc2 active
|
||||
registerSubCluster(SubClusterId.newInstance("SC-1"));
|
||||
registerSubCluster(SubClusterId.newInstance("SC-2"));
|
||||
|
||||
int numberOfContainers = 3;
|
||||
List<Container> containers =
|
||||
getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
|
||||
Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize());
|
||||
|
||||
// Allocate the second batch of containers, with sc1 and sc3 active
|
||||
deRegisterSubCluster(SubClusterId.newInstance("SC-2"));
|
||||
registerSubCluster(SubClusterId.newInstance("SC-3"));
|
||||
|
||||
numberOfContainers = 1;
|
||||
containers.addAll(
|
||||
getContainersAndAssert(numberOfContainers, numberOfContainers * 2));
|
||||
Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize());
|
||||
|
||||
// Allocate the third batch of containers with only in home sub-cluster
|
||||
// active
|
||||
deRegisterSubCluster(SubClusterId.newInstance("SC-1"));
|
||||
deRegisterSubCluster(SubClusterId.newInstance("SC-3"));
|
||||
registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
|
||||
|
||||
numberOfContainers = 2;
|
||||
containers.addAll(
|
||||
getContainersAndAssert(numberOfContainers, numberOfContainers * 1));
|
||||
Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize());
|
||||
|
||||
// Release all containers
|
||||
releaseContainersAndAssert(containers);
|
||||
|
||||
// Finish the application
|
||||
FinishApplicationMasterRequest finishReq =
|
||||
Records.newRecord(FinishApplicationMasterRequest.class);
|
||||
finishReq.setDiagnostics("");
|
||||
finishReq.setTrackingUrl("");
|
||||
finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
|
||||
|
||||
FinishApplicationMasterResponse finshResponse =
|
||||
interceptor.finishApplicationMaster(finishReq);
|
||||
Assert.assertNotNull(finshResponse);
|
||||
Assert.assertEquals(true, finshResponse.getIsUnregistered());
|
||||
}
|
||||
|
||||
/*
|
||||
* Test re-register when RM fails over.
|
||||
*/
|
||||
@Test
|
||||
public void testReregister() throws Exception {
|
||||
|
||||
// Register the application
|
||||
RegisterApplicationMasterRequest registerReq =
|
||||
Records.newRecord(RegisterApplicationMasterRequest.class);
|
||||
registerReq.setHost(Integer.toString(testAppId));
|
||||
registerReq.setRpcPort(testAppId);
|
||||
registerReq.setTrackingUrl("");
|
||||
|
||||
RegisterApplicationMasterResponse registerResponse =
|
||||
interceptor.registerApplicationMaster(registerReq);
|
||||
Assert.assertNotNull(registerResponse);
|
||||
|
||||
Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
|
||||
|
||||
// Allocate the first batch of containers
|
||||
registerSubCluster(SubClusterId.newInstance("SC-1"));
|
||||
registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
|
||||
|
||||
interceptor.setShouldReRegisterNext();
|
||||
|
||||
int numberOfContainers = 3;
|
||||
List<Container> containers =
|
||||
getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
|
||||
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
|
||||
|
||||
interceptor.setShouldReRegisterNext();
|
||||
|
||||
// Release all containers
|
||||
releaseContainersAndAssert(containers);
|
||||
|
||||
interceptor.setShouldReRegisterNext();
|
||||
|
||||
// Finish the application
|
||||
FinishApplicationMasterRequest finishReq =
|
||||
Records.newRecord(FinishApplicationMasterRequest.class);
|
||||
finishReq.setDiagnostics("");
|
||||
finishReq.setTrackingUrl("");
|
||||
finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
|
||||
|
||||
FinishApplicationMasterResponse finshResponse =
|
||||
interceptor.finishApplicationMaster(finishReq);
|
||||
Assert.assertNotNull(finshResponse);
|
||||
Assert.assertEquals(true, finshResponse.getIsUnregistered());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRequestInterceptorChainCreation() throws Exception {
|
||||
RequestInterceptor root =
|
||||
|
|
|
@ -44,6 +44,12 @@ public class TestableFederationInterceptor extends FederationInterceptor {
|
|||
private AtomicInteger runningIndex = new AtomicInteger(0);
|
||||
private MockResourceManagerFacade mockRm;
|
||||
|
||||
@Override
|
||||
protected UnmanagedAMPoolManager createUnmanagedAMPoolManager(
|
||||
ExecutorService threadPool) {
|
||||
return new TestableUnmanagedAMPoolManager(threadPool);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ApplicationMasterProtocol createHomeRMProxy(
|
||||
AMRMProxyApplicationContext appContext) {
|
||||
|
|
Loading…
Reference in New Issue