YARN-11235. Refactor Policy Code and Define getReservationHomeSubcluster (#4656)

This commit is contained in:
slfan1989 2022-08-05 01:16:08 +08:00 committed by GitHub
parent 0aa08ef543
commit 6f7c4c74ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 454 additions and 344 deletions

View File

@ -20,7 +20,7 @@ package org.apache.hadoop.yarn.server.federation.policies;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Collection;
import java.util.Random;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -188,8 +188,8 @@ public final class FederationPolicyUtils {
* @throws FederationPolicyException if there are no usable subclusters.
*/
public static void validateSubClusterAvailability(
List<SubClusterId> activeSubClusters,
List<SubClusterId> blackListSubClusters)
Collection<SubClusterId> activeSubClusters,
Collection<SubClusterId> blackListSubClusters)
throws FederationPolicyException {
if (activeSubClusters != null && !activeSubClusters.isEmpty()) {
if (blackListSubClusters == null) {

View File

@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
@ -136,7 +137,7 @@ public class RouterPolicyFacade {
if (appSubmissionContext == null) {
throw new FederationPolicyException(
"The ApplicationSubmissionContext " + "cannot be null.");
"The ApplicationSubmissionContext cannot be null.");
}
String queue = appSubmissionContext.getQueue();
@ -148,51 +149,7 @@ public class RouterPolicyFacade {
queue = YarnConfiguration.DEFAULT_QUEUE_NAME;
}
// the facade might cache this request, based on its parameterization
SubClusterPolicyConfiguration configuration = null;
try {
configuration = federationFacade.getPolicyConfiguration(queue);
} catch (YarnException e) {
String errMsg = "There is no policy configured for the queue: " + queue
+ ", falling back to defaults.";
LOG.warn(errMsg, e);
}
// If there is no policy configured for this queue, fallback to the baseline
// policy that is configured either in the store or via XML config (and
// cached)
if (configuration == null) {
LOG.warn("There is no policies configured for queue: " + queue + " we"
+ " fallback to default policy for: "
+ YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
queue = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
try {
configuration = federationFacade.getPolicyConfiguration(queue);
} catch (YarnException e) {
String errMsg = "Cannot retrieve policy configured for the queue: "
+ queue + ", falling back to defaults.";
LOG.warn(errMsg, e);
}
}
// the fallback is not configure via store, but via XML, using
// previously loaded configuration.
if (configuration == null) {
configuration =
cachedConfs.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
}
// if the configuration has changed since last loaded, reinit the policy
// based on current configuration
if (!cachedConfs.containsKey(queue)
|| !cachedConfs.get(queue).equals(configuration)) {
singlePolicyReinit(policyMap, cachedConfs, queue, configuration);
}
FederationRouterPolicy policy = policyMap.get(queue);
FederationRouterPolicy policy = getFederationRouterPolicy(cachedConfs, policyMap, queue);
if (policy == null) {
// this should never happen, as the to maps are updated together
throw new FederationPolicyException("No FederationRouterPolicy found "
@ -262,4 +219,92 @@ public class RouterPolicyFacade {
}
/**
* This method provides a wrapper of all policy functionalities for routing a
* reservation. Internally it manages configuration changes, and policy
* init/reinit.
*
* @param request the reservation to route.
*
* @return the id of the subcluster that will be the "home" for this
* reservation.
*
* @throws YarnException if there are issues initializing policies, or no
* valid sub-cluster id could be found for this reservation.
*/
public SubClusterId getReservationHomeSubCluster(
ReservationSubmissionRequest request) throws YarnException {
// the maps are concurrent, but we need to protect from reset()
// reinitialization mid-execution by creating a new reference local to this
// method.
Map<String, SubClusterPolicyConfiguration> cachedConfs = globalConfMap;
Map<String, FederationRouterPolicy> policyMap = globalPolicyMap;
if (request == null) {
throw new FederationPolicyException(
"The ReservationSubmissionRequest cannot be null.");
}
String queue = request.getQueue();
FederationRouterPolicy policy = getFederationRouterPolicy(cachedConfs, policyMap, queue);
if (policy == null) {
// this should never happen, as the to maps are updated together
throw new FederationPolicyException("No FederationRouterPolicy found "
+ "for queue: " + request.getQueue() + " (while routing "
+ "reservation: " + request.getReservationId() + ") "
+ "and no default specified.");
}
return policy.getReservationHomeSubcluster(request);
}
private FederationRouterPolicy getFederationRouterPolicy(
Map<String, SubClusterPolicyConfiguration> cachedConfiguration,
Map<String, FederationRouterPolicy> policyMap, String queue)
throws FederationPolicyInitializationException {
// the facade might cache this request, based on its parameterization
SubClusterPolicyConfiguration configuration = null;
String copyQueue = queue;
try {
configuration = federationFacade.getPolicyConfiguration(copyQueue);
} catch (YarnException e) {
LOG.warn("There is no policy configured for the queue: {}, falling back to defaults.",
copyQueue, e);
}
// If there is no policy configured for this queue, fallback to the baseline
// policy that is configured either in the store or via XML config (and cached)
if (configuration == null) {
final String policyKey = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
LOG.warn("There is no policies configured for queue: {} " +
"we fallback to default policy for: {}. ", copyQueue, policyKey);
copyQueue = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
try {
configuration = federationFacade.getPolicyConfiguration(copyQueue);
} catch (YarnException e) {
LOG.warn("Cannot retrieve policy configured for the queue: {}, falling back to defaults.",
copyQueue, e);
}
}
// the fallback is not configure via store, but via XML, using
// previously loaded configuration.
if (configuration == null) {
configuration = cachedConfiguration.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
}
// if the configuration has changed since last loaded, reinit the policy
// based on current configuration
SubClusterPolicyConfiguration policyConfiguration =
cachedConfiguration.getOrDefault(copyQueue, null);
if (policyConfiguration == null || !policyConfiguration.equals(configuration)) {
singlePolicyReinit(policyMap, cachedConfiguration, copyQueue, configuration);
}
return policyMap.get(copyQueue);
}
}

View File

@ -18,15 +18,22 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.AbstractConfigurableFederationPolicy;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
/**
* Base abstract class for {@link FederationRouterPolicy} implementations, that
@ -63,4 +70,107 @@ public abstract class AbstractRouterPolicy extends
}
}
/**
* This method is implemented by the specific policy, and it is used to route
* both reservations, and applications among a given set of
* sub-clusters.
*
* @param queue the queue for this application/reservation
* @param preSelectSubClusters a pre-filter set of sub-clusters
* @return the chosen sub-cluster
*
* @throws YarnException if the policy fails to choose a sub-cluster
*/
protected abstract SubClusterId chooseSubCluster(String queue,
Map<SubClusterId, SubClusterInfo> preSelectSubClusters) throws YarnException;
/**
* Filter chosen SubCluster based on reservationId.
*
* @param reservationId the globally unique identifier for a reservation.
* @param activeSubClusters the map of ids to info for all active subclusters.
* @return the chosen sub-cluster
* @throws YarnException if the policy fails to choose a sub-cluster
*/
protected Map<SubClusterId, SubClusterInfo> prefilterSubClusters(
ReservationId reservationId, Map<SubClusterId, SubClusterInfo> activeSubClusters)
throws YarnException {
// if a reservation exists limit scope to the sub-cluster this
// reservation is mapped to
// TODO: Implemented in YARN-11236
return activeSubClusters;
}
/**
* Simply picks from alphabetically-sorted active subclusters based on the
* hash of quey name. Jobs of the same queue will all be routed to the same
* sub-cluster, as far as the number of active sub-cluster and their names
* remain the same.
*
* @param appContext the {@link ApplicationSubmissionContext} that
* has to be routed to an appropriate subCluster for execution.
*
* @param blackLists the list of subClusters as identified by
* {@link SubClusterId} to blackList from the selection of the home
* subCluster.
*
* @return a hash-based chosen {@link SubClusterId} that will be the "home"
* for this application.
*
* @throws YarnException if there are no active subclusters.
*/
@Override
public SubClusterId getHomeSubcluster(ApplicationSubmissionContext appContext,
List<SubClusterId> blackLists) throws YarnException {
// null checks and default-queue behavior
validate(appContext);
// apply filtering based on reservation location and active sub-clusters
Map<SubClusterId, SubClusterInfo> filteredSubClusters = prefilterSubClusters(
appContext.getReservationID(), getActiveSubclusters());
FederationPolicyUtils.validateSubClusterAvailability(filteredSubClusters.keySet(), blackLists);
// remove black SubCluster
if (blackLists != null) {
blackLists.forEach(filteredSubClusters::remove);
}
// pick the chosen subCluster from the active ones
return chooseSubCluster(appContext.getQueue(), filteredSubClusters);
}
/**
* This method provides a wrapper of all policy functionalities for routing a
* reservation. Internally it manages configuration changes, and policy
* init/reinit.
*
* @param request the reservation to route.
*
* @return the id of the subcluster that will be the "home" for this
* reservation.
*
* @throws YarnException if there are issues initializing policies, or no
* valid sub-cluster id could be found for this reservation.
*/
@Override
public SubClusterId getReservationHomeSubcluster(ReservationSubmissionRequest request)
throws YarnException {
if (request == null) {
throw new FederationPolicyException("The ReservationSubmissionRequest cannot be null.");
}
if (request.getQueue() == null) {
request.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
}
// apply filtering based on reservation location and active sub-clusters
Map<SubClusterId, SubClusterInfo> filteredSubClusters = prefilterSubClusters(
request.getReservationId(), getActiveSubclusters());
// pick the chosen subCluster from the active ones
return chooseSubCluster(request.getQueue(), filteredSubClusters);
}
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.federation.policies.router;
import java.util.List;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy;
@ -49,4 +50,16 @@ public interface FederationRouterPolicy extends ConfigurableFederationPolicy {
SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext,
List<SubClusterId> blackListSubClusters) throws YarnException;
/**
* Determines the sub-cluster where a ReservationSubmissionRequest should be
* sent to.
*
* @param request the original request
* @return a mapping of sub-clusters and the requests
*
* @throws YarnException if the policy fails to choose a sub-cluster
*/
SubClusterId getReservationHomeSubcluster(
ReservationSubmissionRequest request) throws YarnException;
}

View File

@ -22,11 +22,9 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@ -50,53 +48,12 @@ public class HashBasedRouterPolicy extends AbstractRouterPolicy {
setPolicyContext(federationPolicyContext);
}
/**
* Simply picks from alphabetically-sorted active subclusters based on the
* hash of quey name. Jobs of the same queue will all be routed to the same
* sub-cluster, as far as the number of active sub-cluster and their names
* remain the same.
*
* @param appSubmissionContext the {@link ApplicationSubmissionContext} that
* has to be routed to an appropriate subCluster for execution.
*
* @param blackListSubClusters the list of subClusters as identified by
* {@link SubClusterId} to blackList from the selection of the home
* subCluster.
*
* @return a hash-based chosen {@link SubClusterId} that will be the "home"
* for this application.
*
* @throws YarnException if there are no active subclusters.
*/
@Override
public SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext,
List<SubClusterId> blackListSubClusters) throws YarnException {
// throws if no active subclusters available
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();
FederationPolicyUtils.validateSubClusterAvailability(
new ArrayList<SubClusterId>(activeSubclusters.keySet()),
blackListSubClusters);
if (blackListSubClusters != null) {
// Remove from the active SubClusters from StateStore the blacklisted ones
for (SubClusterId scId : blackListSubClusters) {
activeSubclusters.remove(scId);
}
}
validate(appSubmissionContext);
int chosenPosition = Math.abs(
appSubmissionContext.getQueue().hashCode() % activeSubclusters.size());
List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
protected SubClusterId chooseSubCluster(String queue,
Map<SubClusterId, SubClusterInfo> preSelectSubclusters) throws YarnException {
int chosenPosition = Math.abs(queue.hashCode() % preSelectSubclusters.size());
List<SubClusterId> list = new ArrayList<>(preSelectSubclusters.keySet());
Collections.sort(list);
return list.get(chosenPosition);
}
}

View File

@ -17,14 +17,10 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
@ -65,28 +61,12 @@ public class LoadBasedRouterPolicy extends AbstractRouterPolicy {
}
@Override
public SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext,
List<SubClusterId> blacklist) throws YarnException {
// null checks and default-queue behavior
validate(appSubmissionContext);
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();
FederationPolicyUtils.validateSubClusterAvailability(
new ArrayList<SubClusterId>(activeSubclusters.keySet()), blacklist);
Map<SubClusterIdInfo, Float> weights =
getPolicyInfo().getRouterPolicyWeights();
protected SubClusterId chooseSubCluster(
String queue, Map<SubClusterId, SubClusterInfo> preSelectSubclusters) throws YarnException {
Map<SubClusterIdInfo, Float> weights = getPolicyInfo().getRouterPolicyWeights();
SubClusterIdInfo chosen = null;
long currBestMem = -1;
for (Map.Entry<SubClusterId, SubClusterInfo> entry : activeSubclusters
.entrySet()) {
if (blacklist != null && blacklist.contains(entry.getKey())) {
continue;
}
for (Map.Entry<SubClusterId, SubClusterInfo> entry : preSelectSubclusters.entrySet()) {
SubClusterIdInfo id = new SubClusterIdInfo(entry.getKey());
if (weights.containsKey(id) && weights.get(id) > 0) {
long availableMemory = getAvailableMemory(entry.getValue());
@ -110,7 +90,7 @@ public class LoadBasedRouterPolicy extends AbstractRouterPolicy {
mem = obj.getJSONObject("clusterMetrics").getLong("availableMB");
return mem;
} catch (JSONException j) {
throw new YarnException("FederationSubCluserInfo cannot be parsed", j);
throw new YarnException("FederationSubClusterInfo cannot be parsed", j);
}
}
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.federation.policies.router;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Collections;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -78,7 +79,7 @@ public class LocalityRouterPolicy extends WeightedRandomRouterPolicy {
resolver = policyContext.getFederationSubclusterResolver();
Map<SubClusterIdInfo, Float> weights =
getPolicyInfo().getRouterPolicyWeights();
enabledSCs = new ArrayList<SubClusterId>();
enabledSCs = new ArrayList<>();
for (Map.Entry<SubClusterIdInfo, Float> entry : weights.entrySet()) {
if (entry != null && entry.getValue() > 0) {
enabledSCs.add(entry.getKey().toId());
@ -100,8 +101,7 @@ public class LocalityRouterPolicy extends WeightedRandomRouterPolicy {
// Fast path for FailForward to WeightedRandomRouterPolicy
if (rrList == null || rrList.isEmpty() || (rrList.size() == 1
&& ResourceRequest.isAnyLocation(rrList.get(0).getResourceName()))) {
return super
.getHomeSubcluster(appSubmissionContext, blackListSubClusters);
return super.getHomeSubcluster(appSubmissionContext, blackListSubClusters);
}
if (rrList.size() != 3) {
@ -109,12 +109,11 @@ public class LocalityRouterPolicy extends WeightedRandomRouterPolicy {
"Invalid number of resource requests: " + rrList.size());
}
Map<SubClusterId, SubClusterInfo> activeSubClusters =
getActiveSubclusters();
List<SubClusterId> validSubClusters =
new ArrayList<>(activeSubClusters.keySet());
FederationPolicyUtils
.validateSubClusterAvailability(validSubClusters, blackListSubClusters);
Map<SubClusterId, SubClusterInfo> activeSubClusters = getActiveSubclusters();
Set<SubClusterId> validSubClusters = activeSubClusters.keySet();
FederationPolicyUtils.validateSubClusterAvailability(activeSubClusters.keySet(),
blackListSubClusters);
if (blackListSubClusters != null) {
// Remove from the active SubClusters from StateStore the blacklisted ones
validSubClusters.removeAll(blackListSubClusters);
@ -128,20 +127,21 @@ public class LocalityRouterPolicy extends WeightedRandomRouterPolicy {
ResourceRequest nodeRequest = null;
ResourceRequest rackRequest = null;
ResourceRequest anyRequest = null;
for (ResourceRequest rr : rrList) {
// Handle "node" requests
try {
targetId = resolver.getSubClusterForNode(rr.getResourceName());
nodeRequest = rr;
} catch (YarnException e) {
LOG.error("Cannot resolve node : {}", e.getLocalizedMessage());
LOG.error("Cannot resolve node : {}.", e.getMessage());
}
// Handle "rack" requests
try {
resolver.getSubClustersForRack(rr.getResourceName());
rackRequest = rr;
} catch (YarnException e) {
LOG.error("Cannot resolve rack : {}", e.getLocalizedMessage());
LOG.error("Cannot resolve rack : {}.", e.getMessage());
}
// Handle "ANY" requests
if (ResourceRequest.isAnyLocation(rr.getResourceName())) {
@ -149,32 +149,33 @@ public class LocalityRouterPolicy extends WeightedRandomRouterPolicy {
continue;
}
}
if (nodeRequest == null) {
throw new YarnException("Missing node request");
throw new YarnException("Missing node request.");
}
if (rackRequest == null) {
throw new YarnException("Missing rack request");
throw new YarnException("Missing rack request.");
}
if (anyRequest == null) {
throw new YarnException("Missing any request");
throw new YarnException("Missing any request.");
}
LOG.info(
"Node request: " + nodeRequest.getResourceName() + ", Rack request: "
+ rackRequest.getResourceName() + ", Any request: " + anyRequest
.getResourceName());
LOG.info("Node request: {} , Rack request: {} , Any request: {}.",
nodeRequest.getResourceName(), rackRequest.getResourceName(),
anyRequest.getResourceName());
// Handle "node" requests
if (validSubClusters.contains(targetId) && enabledSCs
.contains(targetId)) {
LOG.info("Node {} is in SubCluster: {}", nodeRequest.getResourceName(),
targetId);
LOG.info("Node {} is in SubCluster: {}.", nodeRequest.getResourceName(), targetId);
return targetId;
} else {
throw new YarnException("The node " + nodeRequest.getResourceName()
+ " is in a blacklist SubCluster or not active. ");
}
} catch (YarnException e) {
LOG.error("Validating resource requests failed, Falling back to "
+ "WeightedRandomRouterPolicy placement: " + e.getMessage());
LOG.error("Validating resource requests failed, " +
"Falling back to WeightedRandomRouterPolicy placement : {}.", e.getMessage());
// FailForward to WeightedRandomRouterPolicy
// Overwrite request to use a default ANY
ResourceRequest amReq = Records.newRecord(ResourceRequest.class);
@ -183,14 +184,10 @@ public class LocalityRouterPolicy extends WeightedRandomRouterPolicy {
amReq.setCapability(appSubmissionContext.getResource());
amReq.setNumContainers(1);
amReq.setRelaxLocality(true);
amReq.setNodeLabelExpression(
appSubmissionContext.getNodeLabelExpression());
amReq.setExecutionTypeRequest(
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED));
appSubmissionContext
.setAMContainerResourceRequests(Collections.singletonList(amReq));
return super
.getHomeSubcluster(appSubmissionContext, blackListSubClusters);
amReq.setNodeLabelExpression(appSubmissionContext.getNodeLabelExpression());
amReq.setExecutionTypeRequest(ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED));
appSubmissionContext.setAMContainerResourceRequests(Collections.singletonList(amReq));
return super.getHomeSubcluster(appSubmissionContext, blackListSubClusters);
}
}
}

View File

@ -17,13 +17,9 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
@ -37,30 +33,15 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
public class PriorityRouterPolicy extends AbstractRouterPolicy {
@Override
public SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext,
List<SubClusterId> blacklist) throws YarnException {
// null checks and default-queue behavior
validate(appSubmissionContext);
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();
FederationPolicyUtils.validateSubClusterAvailability(
new ArrayList<SubClusterId>(activeSubclusters.keySet()), blacklist);
protected SubClusterId chooseSubCluster(
String queue, Map<SubClusterId, SubClusterInfo> preSelectSubclusters) throws YarnException {
// This finds the sub-cluster with the highest weight among the
// currently active ones.
Map<SubClusterIdInfo, Float> weights =
getPolicyInfo().getRouterPolicyWeights();
Map<SubClusterIdInfo, Float> weights = getPolicyInfo().getRouterPolicyWeights();
SubClusterId chosen = null;
Float currentBest = Float.MIN_VALUE;
for (SubClusterId id : activeSubclusters.keySet()) {
for (SubClusterId id : preSelectSubclusters.keySet()) {
SubClusterIdInfo idInfo = new SubClusterIdInfo(id);
if (blacklist != null && blacklist.contains(id)) {
continue;
}
if (weights.containsKey(idInfo) && weights.get(idInfo) > currentBest) {
currentBest = weights.get(idInfo);
chosen = id;
@ -68,10 +49,8 @@ public class PriorityRouterPolicy extends AbstractRouterPolicy {
}
if (chosen == null) {
throw new FederationPolicyException(
"No Active Subcluster with weight vector greater than zero");
"No Active Subcluster with weight vector greater than zero.");
}
return chosen;
}
}

View File

@ -17,15 +17,15 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
/**
* This {@link FederationRouterPolicy} simply rejects all incoming requests.
@ -43,34 +43,12 @@ public class RejectRouterPolicy extends AbstractRouterPolicy {
setPolicyContext(federationPolicyContext);
}
/**
* The policy always reject requests.
*
* @param appSubmissionContext the {@link ApplicationSubmissionContext} that
* has to be routed to an appropriate subCluster for execution.
*
* @param blackListSubClusters the list of subClusters as identified by
* {@link SubClusterId} to blackList from the selection of the home
* subCluster.
*
* @return (never).
*
* @throws YarnException (always) to prevent applications in this queue to be
* run anywhere in the federated cluster.
*/
@Override
public SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext,
List<SubClusterId> blackListSubClusters) throws YarnException {
// run standard validation, as error might differ
validate(appSubmissionContext);
throw new FederationPolicyException("The policy configured for this queue"
+ " (" + appSubmissionContext.getQueue() + ") reject all routing "
+ "requests by construction. Application "
+ appSubmissionContext.getApplicationId()
+ " cannot be routed to any RM.");
protected SubClusterId chooseSubCluster(
String queue, Map<SubClusterId, SubClusterInfo> preSelectSubclusters) throws YarnException {
throw new FederationPolicyException(
"The policy configured for this queue (" + queue + ") "
+ "reject all routing requests by construction. Application in "
+ queue + " cannot be routed to any RM.");
}
}

View File

@ -22,11 +22,10 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@ -55,50 +54,16 @@ public class UniformRandomRouterPolicy extends AbstractRouterPolicy {
this.getClass().getCanonicalName());
// note: this overrides AbstractRouterPolicy and ignores the weights
setPolicyContext(policyContext);
}
/**
* Simply picks a random active subCluster to start the AM (this does NOT
* depend on the weights in the policy).
*
* @param appSubmissionContext the {@link ApplicationSubmissionContext} that
* has to be routed to an appropriate subCluster for execution.
*
* @param blackListSubClusters the list of subClusters as identified by
* {@link SubClusterId} to blackList from the selection of the home
* subCluster.
*
* @return a randomly chosen subcluster.
*
* @throws YarnException if there are no active subclusters.
*/
@Override
public SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext,
List<SubClusterId> blackListSubClusters) throws YarnException {
// null checks and default-queue behavior
validate(appSubmissionContext);
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();
List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
FederationPolicyUtils.validateSubClusterAvailability(list,
blackListSubClusters);
if (blackListSubClusters != null) {
// Remove from the active SubClusters from StateStore the blacklisted ones
for (SubClusterId scId : blackListSubClusters) {
list.remove(scId);
}
protected SubClusterId chooseSubCluster(
String queue, Map<SubClusterId, SubClusterInfo> preSelectSubclusters) throws YarnException {
if (preSelectSubclusters == null || preSelectSubclusters.isEmpty()) {
throw new FederationPolicyException("No available subcluster to choose from.");
}
List<SubClusterId> list = new ArrayList<>(preSelectSubclusters.keySet());
return list.get(rand.nextInt(list.size()));
}
}

View File

@ -19,10 +19,8 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
@ -35,47 +33,30 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
* sub-clusters.
*/
public class WeightedRandomRouterPolicy extends AbstractRouterPolicy {
@Override
public SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext,
List<SubClusterId> blacklist) throws YarnException {
protected SubClusterId chooseSubCluster(
String queue, Map<SubClusterId, SubClusterInfo> preSelectSubclusters) throws YarnException {
// null checks and default-queue behavior
validate(appSubmissionContext);
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();
FederationPolicyUtils.validateSubClusterAvailability(
new ArrayList<SubClusterId>(activeSubclusters.keySet()), blacklist);
// note: we cannot pre-compute the weights, as the set of activeSubcluster
// note: we cannot pre-compute the weights, as the set of activeSubCluster
// changes dynamically (and this would unfairly spread the load to
// sub-clusters adjacent to an inactive one), hence we need to count/scan
// the list and based on weight pick the next sub-cluster.
Map<SubClusterIdInfo, Float> weights =
getPolicyInfo().getRouterPolicyWeights();
Map<SubClusterIdInfo, Float> weights = getPolicyInfo().getRouterPolicyWeights();
ArrayList<Float> weightList = new ArrayList<>();
ArrayList<SubClusterId> scIdList = new ArrayList<>();
for (Map.Entry<SubClusterIdInfo, Float> entry : weights.entrySet()) {
if (blacklist != null && blacklist.contains(entry.getKey().toId())) {
continue;
}
if (entry.getKey() != null
&& activeSubclusters.containsKey(entry.getKey().toId())) {
SubClusterIdInfo key = entry.getKey();
if (key != null && preSelectSubclusters.containsKey(key.toId())) {
weightList.add(entry.getValue());
scIdList.add(entry.getKey().toId());
scIdList.add(key.toId());
}
}
int pickedIndex = FederationPolicyUtils.getWeightedRandom(weightList);
if (pickedIndex == -1) {
throw new FederationPolicyException(
"No positive weight found on active subclusters");
throw new FederationPolicyException("No positive weight found on active subclusters");
}
return scIdList.get(pickedIndex);
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.federation.store.records;
import java.util.List;
import java.util.Collection;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
@ -36,7 +37,7 @@ public abstract class GetSubClustersInfoResponse {
@Public
@Unstable
public static GetSubClustersInfoResponse newInstance(
List<SubClusterInfo> subClusters) {
Collection<SubClusterInfo> subClusters) {
GetSubClustersInfoResponse subClusterInfos =
Records.newRecord(GetSubClustersInfoResponse.class);
subClusterInfos.setSubClusters(subClusters);
@ -61,6 +62,5 @@ public abstract class GetSubClustersInfoResponse {
*/
@Private
@Unstable
public abstract void setSubClusters(List<SubClusterInfo> subClusters);
public abstract void setSubClusters(Collection<SubClusterInfo> subClusters);
}

View File

@ -19,8 +19,10 @@
package org.apache.hadoop.yarn.server.federation.store.records.impl.pb;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@ -93,12 +95,12 @@ public class GetSubClustersInfoResponsePBImpl
}
@Override
public void setSubClusters(List<SubClusterInfo> subClusters) {
public void setSubClusters(Collection<SubClusterInfo> subClusters) {
if (subClusters == null) {
builder.clearSubClusterInfos();
return;
}
this.subClusterInfos = subClusters;
this.subClusterInfos = subClusters.stream().collect(Collectors.toList());
}
private void initSubClustersInfoList() {

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.server.federation.policies;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.nio.ByteBuffer;
import java.util.HashMap;
@ -28,20 +27,26 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.junit.Test;
/**
@ -58,6 +63,9 @@ public abstract class BaseFederationPoliciesTest {
private Random rand = new Random();
private SubClusterId homeSubCluster;
private ReservationSubmissionRequest reservationSubmissionRequest =
mock(ReservationSubmissionRequest.class);
@Test
public void testReinitilialize() throws YarnException {
FederationPolicyInitializationContext fpc =
@ -177,11 +185,60 @@ public abstract class BaseFederationPoliciesTest {
public void setMockActiveSubclusters(int numSubclusters) {
for (int i = 1; i <= numSubclusters; i++) {
SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i);
SubClusterInfo sci = mock(SubClusterInfo.class);
when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
when(sci.getSubClusterId()).thenReturn(sc.toId());
SubClusterInfo sci = SubClusterInfo.newInstance(
sc.toId(), "dns1:80", "dns1:81", "dns1:82", "dns1:83", SubClusterState.SC_RUNNING,
System.currentTimeMillis(), "something");
getActiveSubclusters().put(sc.toId(), sci);
}
}
public String generateClusterMetricsInfo(int id) {
long mem = 1024 * getRand().nextInt(277 * 100 - 1);
// plant a best cluster
if (id == 5) {
mem = 1024 * 277 * 100;
}
String clusterMetrics =
"{\"clusterMetrics\":{\"appsSubmitted\":65, \"appsCompleted\":64,\"appsPending\":0,"
+ "\"appsRunning\":0, \"appsFailed\":0, \"appsKilled\":1,\"reservedMB\":0,\"availableMB\":"
+ mem + ", \"allocatedMB\":0,\"reservedVirtualCores\":0, \"availableVirtualCores\":2216,"
+ "\"allocatedVirtualCores\":0, \"containersAllocated\":0,\"containersReserved\":0,"
+ "\"containersPending\":0,\"totalMB\":28364800, \"totalVirtualCores\":2216,"
+ "\"totalNodes\":278, \"lostNodes\":1,\"unhealthyNodes\":0,\"decommissionedNodes\":0, "
+ "\"rebootedNodes\":0, \"activeNodes\":277}}";
return clusterMetrics;
}
public FederationStateStoreFacade getMemoryFacade() throws YarnException {
// setting up a store and its facade (with caching off)
FederationStateStoreFacade fedFacade = FederationStateStoreFacade.getInstance();
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
FederationStateStore store = new MemoryFederationStateStore();
store.init(conf);
fedFacade.reinitialize(store, conf);
for (SubClusterInfo sinfo : getActiveSubclusters().values()) {
store.registerSubCluster(SubClusterRegisterRequest.newInstance(sinfo));
}
return fedFacade;
}
public ReservationSubmissionRequest getReservationSubmissionRequest() {
return reservationSubmissionRequest;
}
public void setReservationSubmissionRequest(
ReservationSubmissionRequest reservationSubmissionRequest) {
this.reservationSubmissionRequest = reservationSubmissionRequest;
}
public void setupContext() throws YarnException {
FederationPolicyInitializationContext context =
FederationPoliciesTestUtil.initializePolicyContext2(getPolicy(),
getPolicyInfo(), getActiveSubclusters(), getMemoryFacade());
this.setFederationPolicyContext(context);
}
}

View File

@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
@ -115,4 +116,14 @@ public abstract class BaseRouterPoliciesTest
}
}
}
@Test
public void testNullReservationContext() throws Exception {
FederationRouterPolicy policy = ((FederationRouterPolicy) getPolicy());
LambdaTestUtils.intercept(FederationPolicyException.class,
"The ReservationSubmissionRequest cannot be null.",
() -> policy.getReservationHomeSubcluster(null));
}
}

View File

@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -50,8 +49,7 @@ public class TestHashBasedRouterPolicy extends BaseRouterPoliciesTest {
setMockActiveSubclusters(numSubclusters);
// initialize policy with context
FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
getPolicyInfo(), getActiveSubclusters());
setupContext();
}
@Test

View File

@ -17,12 +17,13 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
import static org.junit.Assert.fail;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
@ -46,12 +47,14 @@ public class TestLoadBasedRouterPolicy extends BaseRouterPoliciesTest {
Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
Map<SubClusterIdInfo, Float> amrmWeights = new HashMap<>();
long now = Time.now();
// simulate 20 active subclusters
for (int i = 0; i < 20; i++) {
SubClusterIdInfo sc = new SubClusterIdInfo(String.format("sc%02d", i));
SubClusterInfo federationSubClusterInfo =
SubClusterInfo.newInstance(sc.toId(), null, null, null, null, -1,
SubClusterState.SC_RUNNING, -1, generateClusterMetricsInfo(i));
SubClusterInfo federationSubClusterInfo = SubClusterInfo.newInstance(
sc.toId(), "dns1:80", "dns1:81", "dns1:82", "dns1:83",
now - 1000, SubClusterState.SC_RUNNING, now - 2000, generateClusterMetricsInfo(i));
getActiveSubclusters().put(sc.toId(), federationSubClusterInfo);
float weight = getRand().nextInt(2);
if (i == 5) {
@ -67,12 +70,11 @@ public class TestLoadBasedRouterPolicy extends BaseRouterPoliciesTest {
getPolicyInfo().setRouterPolicyWeights(routerWeights);
getPolicyInfo().setAMRMPolicyWeights(amrmWeights);
FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
getPolicyInfo(), getActiveSubclusters());
// initialize policy with context
setupContext();
}
private String generateClusterMetricsInfo(int id) {
public String generateClusterMetricsInfo(int id) {
long mem = 1024 * getRand().nextInt(277 * 100 - 1);
// plant a best cluster
@ -106,7 +108,7 @@ public class TestLoadBasedRouterPolicy extends BaseRouterPoliciesTest {
}
@Test
public void testIfNoSubclustersWithWeightOne() {
public void testIfNoSubclustersWithWeightOne() throws Exception {
setPolicy(new LoadBasedRouterPolicy());
setPolicyInfo(new WeightedPolicyInfo());
Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
@ -123,15 +125,13 @@ public class TestLoadBasedRouterPolicy extends BaseRouterPoliciesTest {
getPolicyInfo().setRouterPolicyWeights(routerWeights);
getPolicyInfo().setAMRMPolicyWeights(amrmWeights);
try {
FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
getPolicyInfo(), getActiveSubclusters());
((FederationRouterPolicy) getPolicy())
.getHomeSubcluster(getApplicationSubmissionContext(), null);
fail();
} catch (YarnException ex) {
Assert.assertTrue(
ex.getMessage().contains("Zero Active Subcluster with weight 1"));
}
ConfigurableFederationPolicy policy = getPolicy();
FederationPoliciesTestUtil.initializePolicyContext(policy,
getPolicyInfo(), getActiveSubclusters());
LambdaTestUtils.intercept(YarnException.class, "Zero Active Subcluster with weight 1.",
() -> ((FederationRouterPolicy) policy).
getHomeSubcluster(getApplicationSubmissionContext(), null));
}
}

View File

@ -73,6 +73,7 @@ public class TestLocalityRouterPolicy extends TestWeightedRandomRouterPolicy {
configureWeights(4);
// initialize policy with context
initializePolicy(new YarnConfiguration());
}
@ -86,9 +87,7 @@ public class TestLocalityRouterPolicy extends TestWeightedRandomRouterPolicy {
.newInstance("queue1", getPolicy().getClass().getCanonicalName(),
buf));
getFederationPolicyContext().setHomeSubcluster(getHomeSubCluster());
FederationPoliciesTestUtil
.initializePolicyContext(getFederationPolicyContext(), getPolicy(),
getPolicyInfo(), getActiveSubclusters(), conf);
setupContext();
}
/**

View File

@ -23,6 +23,7 @@ import static org.mockito.Mockito.when;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
@ -54,10 +55,11 @@ public class TestPriorityRouterPolicy extends BaseRouterPoliciesTest {
// with 5% omit a subcluster
if (getRand().nextFloat() < 0.95f || i == 5) {
SubClusterInfo sci = mock(SubClusterInfo.class);
when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
when(sci.getSubClusterId()).thenReturn(sc.toId());
getActiveSubclusters().put(sc.toId(), sci);
long now = Time.now();
SubClusterInfo federationSubClusterInfo = SubClusterInfo.newInstance(
sc.toId(), "dns1:80", "dns1:81", "dns1:82", "dns1:83",
now - 1000, SubClusterState.SC_RUNNING, now - 2000, generateClusterMetricsInfo(i));
getActiveSubclusters().put(sc.toId(), federationSubClusterInfo);
}
float weight = getRand().nextFloat();
if (i == 5) {
@ -105,7 +107,7 @@ public class TestPriorityRouterPolicy extends BaseRouterPoliciesTest {
getPolicyInfo(), getActiveSubclusters());
intercept(FederationPolicyException.class,
"No Active Subcluster with weight vector greater than zero",
"No Active Subcluster with weight vector greater than zero.",
() -> ((FederationRouterPolicy) getPolicy())
.getHomeSubcluster(getApplicationSubmissionContext(), null));
}

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.federation.policies.router;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Before;
import org.junit.Test;
@ -39,8 +38,7 @@ public class TestRejectRouterPolicy extends BaseRouterPoliciesTest {
setMockActiveSubclusters(2);
// initialize policy with context
FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
getPolicyInfo(), getActiveSubclusters());
setupContext();
}
@ -59,5 +57,4 @@ public class TestRejectRouterPolicy extends BaseRouterPoliciesTest {
false, false, 0, Resources.none(), null, false, null, null);
localPolicy.getHomeSubcluster(applicationSubmissionContext, null);
}
}

View File

@ -18,15 +18,14 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -44,14 +43,14 @@ public class TestUniformRandomRouterPolicy extends BaseRouterPoliciesTest {
setPolicyInfo(mock(WeightedPolicyInfo.class));
for (int i = 1; i <= 2; i++) {
SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i);
SubClusterInfo sci = mock(SubClusterInfo.class);
when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
when(sci.getSubClusterId()).thenReturn(sc.toId());
getActiveSubclusters().put(sc.toId(), sci);
long now = Time.now();
SubClusterInfo federationSubClusterInfo = SubClusterInfo.newInstance(
sc.toId(), "dns1:80", "dns1:81", "dns1:82", "dns1:83",
now - 1000, SubClusterState.SC_RUNNING, now - 2000, generateClusterMetricsInfo(i));
getActiveSubclusters().put(sc.toId(), federationSubClusterInfo);
}
FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
mock(WeightedPolicyInfo.class), getActiveSubclusters());
setupContext();
}
@Test

View File

@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
@ -32,7 +33,6 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -51,8 +51,7 @@ public class TestWeightedRandomRouterPolicy extends BaseRouterPoliciesTest {
configureWeights(20);
FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
getPolicyInfo(), getActiveSubclusters());
setupContext();
}
public void configureWeights(float numSubClusters) {
@ -68,10 +67,11 @@ public class TestWeightedRandomRouterPolicy extends BaseRouterPoliciesTest {
SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i);
// with 5% omit a subcluster
if (getRand().nextFloat() < 0.95f) {
SubClusterInfo sci = mock(SubClusterInfo.class);
when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
when(sci.getSubClusterId()).thenReturn(sc.toId());
getActiveSubclusters().put(sc.toId(), sci);
long now = Time.now();
SubClusterInfo federationSubClusterInfo = SubClusterInfo.newInstance(
sc.toId(), "dns1:80", "dns1:81", "dns1:82", "dns1:83",
now - 1000, SubClusterState.SC_RUNNING, now - 2000, generateClusterMetricsInfo(i));
getActiveSubclusters().put(sc.toId(), federationSubClusterInfo);
}
// 80% of the weight is evenly spread, 20% is randomly generated

View File

@ -159,6 +159,51 @@ public final class FederationPoliciesTestUtil {
new Configuration());
}
public static FederationPolicyInitializationContext initializePolicyContext2(
ConfigurableFederationPolicy policy, WeightedPolicyInfo policyInfo,
Map<SubClusterId, SubClusterInfo> activeSubClusters,
FederationStateStoreFacade facade) throws YarnException {
FederationPolicyInitializationContext context =
new FederationPolicyInitializationContext(null, initResolver(), facade,
SubClusterId.newInstance("homesubcluster"));
return initializePolicyContext2(context, policy, policyInfo, activeSubClusters);
}
public static FederationPolicyInitializationContext initializePolicyContext2(
ConfigurableFederationPolicy policy, WeightedPolicyInfo policyInfo,
Map<SubClusterId, SubClusterInfo> activeSubClusters)
throws YarnException {
return initializePolicyContext2(policy, policyInfo, activeSubClusters, initFacade());
}
public static FederationPolicyInitializationContext initializePolicyContext2(
FederationPolicyInitializationContext fpc,
ConfigurableFederationPolicy policy, WeightedPolicyInfo policyInfo,
Map<SubClusterId, SubClusterInfo> activeSubClusters)
throws YarnException {
ByteBuffer buf = policyInfo.toByteBuffer();
fpc.setSubClusterPolicyConfiguration(SubClusterPolicyConfiguration
.newInstance("queue1", policy.getClass().getCanonicalName(), buf));
if (fpc.getFederationStateStoreFacade() == null) {
FederationStateStoreFacade facade = FederationStateStoreFacade.getInstance();
FederationStateStore fss = mock(FederationStateStore.class);
if (activeSubClusters == null) {
activeSubClusters = new HashMap<>();
}
GetSubClustersInfoResponse response = GetSubClustersInfoResponse.newInstance(
activeSubClusters.values());
when(fss.getSubClusters(any())).thenReturn(response);
facade.reinitialize(fss, new Configuration());
fpc.setFederationStateStoreFacade(facade);
}
policy.reinitialize(fpc);
return fpc;
}
/**
* Initialize a {@link SubClusterResolver}.
*

View File

@ -34,6 +34,7 @@ import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletRequestWrapper;
@ -179,19 +180,13 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
RouterServerUtil.logAndThrowException(
FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null);
}
List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
FederationPolicyUtils.validateSubClusterAvailability(
list, blackListSubClusters);
Collection<SubClusterId> keySet = activeSubclusters.keySet();
FederationPolicyUtils.validateSubClusterAvailability(keySet, blackListSubClusters);
if (blackListSubClusters != null) {
// Remove from the active SubClusters from StateStore the blacklisted ones
for (SubClusterId scId : blackListSubClusters) {
list.remove(scId);
}
keySet.removeAll(blackListSubClusters);
}
List<SubClusterId> list = keySet.stream().collect(Collectors.toList());
return list.get(rand.nextInt(list.size()));
}