YARN-5325. Stateless ARMRMProxy policies implementation. (Carlo Curino via Subru).

(cherry picked from commit 11c5336522)
This commit is contained in:
Subru Krishnan 2016-10-13 17:59:13 -07:00 committed by Carlo Curino
parent 0662996b6a
commit 1dadd0b45a
31 changed files with 1842 additions and 345 deletions

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -16,82 +16,87 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.router;
package org.apache.hadoop.yarn.server.federation.policies;
import java.util.Map;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import java.util.Map;
/**
* Abstract class provides common validation of reinitialize(), for all
* policies that are "weight-based".
* Base abstract class for a weighted {@link ConfigurableFederationPolicy}.
*/
public abstract class BaseWeightedRouterPolicy
implements FederationRouterPolicy {
public abstract class AbstractConfigurableFederationPolicy
implements ConfigurableFederationPolicy {
private WeightedPolicyInfo policyInfo = null;
private FederationPolicyInitializationContext policyContext;
private boolean isDirty;
public BaseWeightedRouterPolicy() {
public AbstractConfigurableFederationPolicy() {
}
@Override
public void reinitialize(FederationPolicyInitializationContext
federationPolicyContext)
public void reinitialize(
FederationPolicyInitializationContext initializationContext)
throws FederationPolicyInitializationException {
isDirty = true;
FederationPolicyInitializationContextValidator
.validate(federationPolicyContext, this.getClass().getCanonicalName());
.validate(initializationContext, this.getClass().getCanonicalName());
// perform consistency checks
WeightedPolicyInfo newPolicyInfo = WeightedPolicyInfo
.fromByteBuffer(
federationPolicyContext.getSubClusterPolicyConfiguration()
.getParams());
WeightedPolicyInfo newPolicyInfo = WeightedPolicyInfo.fromByteBuffer(
initializationContext.getSubClusterPolicyConfiguration().getParams());
// if nothing has changed skip the rest of initialization
// and signal to childs that the reinit is free via isDirty var.
if (policyInfo != null && policyInfo.equals(newPolicyInfo)) {
isDirty = false;
return;
}
validate(newPolicyInfo);
setPolicyInfo(newPolicyInfo);
this.policyContext = federationPolicyContext;
this.policyContext = initializationContext;
}
/**
* Overridable validation step for the policy configuration.
*
* @param newPolicyInfo the configuration to test.
* @throws FederationPolicyInitializationException if the configuration is
* not valid.
*
* @throws FederationPolicyInitializationException if the configuration is not
* valid.
*/
public void validate(WeightedPolicyInfo newPolicyInfo) throws
FederationPolicyInitializationException {
public void validate(WeightedPolicyInfo newPolicyInfo)
throws FederationPolicyInitializationException {
if (newPolicyInfo == null) {
throw new FederationPolicyInitializationException("The policy to "
+ "validate should not be null.");
}
Map<SubClusterIdInfo, Float> newWeights =
newPolicyInfo.getRouterPolicyWeights();
if (newWeights == null || newWeights.size() < 1) {
throw new FederationPolicyInitializationException(
"Weight vector cannot be null/empty.");
"The policy to " + "validate should not be null.");
}
}
/**
* Returns true whether the last reinitialization requires actual changes, or
* was "free" as the weights have not changed. This is used by subclasses
* overriding reinitialize and calling super.reinitialize() to know wheter to
* quit early.
*
* @return whether more work is needed to initialize.
*/
public boolean getIsDirty() {
return isDirty;
}
/**
* Getter method for the configuration weights.
*
* @return the {@link WeightedPolicyInfo} representing the policy
* configuration.
* configuration.
*/
public WeightedPolicyInfo getPolicyInfo() {
return policyInfo;
@ -101,15 +106,15 @@ public abstract class BaseWeightedRouterPolicy
* Setter method for the configuration weights.
*
* @param policyInfo the {@link WeightedPolicyInfo} representing the policy
* configuration.
* configuration.
*/
public void setPolicyInfo(
WeightedPolicyInfo policyInfo) {
public void setPolicyInfo(WeightedPolicyInfo policyInfo) {
this.policyInfo = policyInfo;
}
/**
* Getter method for the {@link FederationPolicyInitializationContext}.
*
* @return the context for this policy.
*/
public FederationPolicyInitializationContext getPolicyContext() {
@ -118,6 +123,7 @@ public abstract class BaseWeightedRouterPolicy
/**
* Setter method for the {@link FederationPolicyInitializationContext}.
*
* @param policyContext the context to assign to this policy.
*/
public void setPolicyContext(
@ -130,13 +136,14 @@ public abstract class BaseWeightedRouterPolicy
* FederationStateStoreFacade} and validate it not being null/empty.
*
* @return the map of ids to info for all active subclusters.
*
* @throws YarnException if we can't get the list.
*/
protected Map<SubClusterId, SubClusterInfo> getActiveSubclusters()
throws YarnException {
Map<SubClusterId, SubClusterInfo> activeSubclusters = getPolicyContext()
.getFederationStateStoreFacade().getSubClusters(true);
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getPolicyContext().getFederationStateStoreFacade().getSubClusters(true);
if (activeSubclusters == null || activeSubclusters.size() < 1) {
throw new NoActiveSubclustersException(
@ -145,6 +152,4 @@ public abstract class BaseWeightedRouterPolicy
return activeSubclusters;
}
}

View File

@ -31,14 +31,11 @@ public interface ConfigurableFederationPolicy {
* policies. The implementor should provide try-n-swap semantics, and retain
* state if possible.
*
* @param federationPolicyInitializationContext the new context to provide to
* implementor.
* @param policyContext the new context to provide to implementor.
*
* @throws FederationPolicyInitializationException in case the initialization
* fails.
* fails.
*/
void reinitialize(
FederationPolicyInitializationContext
federationPolicyInitializationContext)
void reinitialize(FederationPolicyInitializationContext policyContext)
throws FederationPolicyInitializationException;
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.federation.policies;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
@ -30,6 +31,7 @@ public class FederationPolicyInitializationContext {
private SubClusterPolicyConfiguration federationPolicyConfiguration;
private SubClusterResolver federationSubclusterResolver;
private FederationStateStoreFacade federationStateStoreFacade;
private SubClusterId homeSubcluster;
public FederationPolicyInitializationContext() {
federationPolicyConfiguration = null;
@ -37,20 +39,19 @@ public class FederationPolicyInitializationContext {
federationStateStoreFacade = null;
}
public FederationPolicyInitializationContext(SubClusterPolicyConfiguration
policy, SubClusterResolver resolver, FederationStateStoreFacade
storeFacade) {
public FederationPolicyInitializationContext(
SubClusterPolicyConfiguration policy, SubClusterResolver resolver,
FederationStateStoreFacade storeFacade) {
this.federationPolicyConfiguration = policy;
this.federationSubclusterResolver = resolver;
this.federationStateStoreFacade = storeFacade;
}
/**
* Getter for the {@link SubClusterPolicyConfiguration}.
*
* @return the {@link SubClusterPolicyConfiguration} to be used for
* initialization.
* initialization.
*/
public SubClusterPolicyConfiguration getSubClusterPolicyConfiguration() {
return federationPolicyConfiguration;
@ -59,8 +60,8 @@ public class FederationPolicyInitializationContext {
/**
* Setter for the {@link SubClusterPolicyConfiguration}.
*
* @param fedPolicyConfiguration the {@link SubClusterPolicyConfiguration}
* to be used for initialization.
* @param fedPolicyConfiguration the {@link SubClusterPolicyConfiguration} to
* be used for initialization.
*/
public void setSubClusterPolicyConfiguration(
SubClusterPolicyConfiguration fedPolicyConfiguration) {
@ -80,7 +81,7 @@ public class FederationPolicyInitializationContext {
* Setter for the {@link SubClusterResolver}.
*
* @param federationSubclusterResolver the {@link SubClusterResolver} to be
* used for initialization.
* used for initialization.
*/
public void setFederationSubclusterResolver(
SubClusterResolver federationSubclusterResolver) {
@ -105,4 +106,24 @@ public class FederationPolicyInitializationContext {
FederationStateStoreFacade federationStateStoreFacade) {
this.federationStateStoreFacade = federationStateStoreFacade;
}
/**
* Returns the current home sub-cluster. Useful for default policy behaviors.
*
* @return the home sub-cluster.
*/
public SubClusterId getHomeSubcluster() {
return homeSubcluster;
}
/**
* Sets in the context the home sub-cluster. Useful for default policy
* behaviors.
*
* @param homeSubcluster value to set.
*/
public void setHomeSubcluster(SubClusterId homeSubcluster) {
this.homeSubcluster = homeSubcluster;
}
}

View File

@ -25,50 +25,44 @@ import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPo
public final class FederationPolicyInitializationContextValidator {
private FederationPolicyInitializationContextValidator() {
//disable constructor per checkstyle
// disable constructor per checkstyle
}
public static void validate(
FederationPolicyInitializationContext
federationPolicyInitializationContext,
String myType) throws FederationPolicyInitializationException {
FederationPolicyInitializationContext policyContext, String myType)
throws FederationPolicyInitializationException {
if (myType == null) {
throw new FederationPolicyInitializationException("The myType parameter"
+ " should not be null.");
throw new FederationPolicyInitializationException(
"The myType parameter" + " should not be null.");
}
if (federationPolicyInitializationContext == null) {
if (policyContext == null) {
throw new FederationPolicyInitializationException(
"The FederationPolicyInitializationContext provided is null. Cannot"
+ " reinitalize "
+ "successfully.");
+ " reinitalize " + "successfully.");
}
if (federationPolicyInitializationContext.getFederationStateStoreFacade()
== null) {
if (policyContext.getFederationStateStoreFacade() == null) {
throw new FederationPolicyInitializationException(
"The FederationStateStoreFacade provided is null. Cannot"
+ " reinitalize successfully.");
}
if (federationPolicyInitializationContext.getFederationSubclusterResolver()
== null) {
if (policyContext.getFederationSubclusterResolver() == null) {
throw new FederationPolicyInitializationException(
"The FederationStateStoreFacase provided is null. Cannot"
+ " reinitalize successfully.");
}
if (federationPolicyInitializationContext.getSubClusterPolicyConfiguration()
== null) {
if (policyContext.getSubClusterPolicyConfiguration() == null) {
throw new FederationPolicyInitializationException(
"The FederationSubclusterResolver provided is null. Cannot "
+ "reinitalize successfully.");
}
String intendedType =
federationPolicyInitializationContext.getSubClusterPolicyConfiguration()
.getType();
policyContext.getSubClusterPolicyConfiguration().getType();
if (!myType.equals(intendedType)) {
throw new FederationPolicyInitializationException(

View File

@ -25,19 +25,19 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyCo
/**
*
* Implementors need to provide the ability to serliaze a policy and its
* configuration as a {@link SubClusterPolicyConfiguration}, as well as
* provide (re)initialization mechanics for the underlying
* configuration as a {@link SubClusterPolicyConfiguration}, as well as provide
* (re)initialization mechanics for the underlying
* {@link FederationAMRMProxyPolicy} and {@link FederationRouterPolicy}.
*
* The serialization aspects are used by admin APIs or a policy engine to
* store a serialized configuration in the {@code FederationStateStore},
* while the getters methods are used to obtain a propertly inizialized
* policy in the {@code Router} and {@code AMRMProxy} respectively.
* The serialization aspects are used by admin APIs or a policy engine to store
* a serialized configuration in the {@code FederationStateStore}, while the
* getters methods are used to obtain a propertly inizialized policy in the
* {@code Router} and {@code AMRMProxy} respectively.
*
* This interface by design binds together
* {@link FederationAMRMProxyPolicy} and {@link FederationRouterPolicy} and
* provide lifecycle support for serialization and deserialization, to reduce
* configuration mistakes (combining incompatible policies).
* This interface by design binds together {@link FederationAMRMProxyPolicy} and
* {@link FederationRouterPolicy} and provide lifecycle support for
* serialization and deserialization, to reduce configuration mistakes
* (combining incompatible policies).
*
*/
public interface FederationPolicyManager {
@ -50,23 +50,17 @@ public interface FederationPolicyManager {
* the implementors should attempt to reinitalize (retaining state). To affect
* a complete policy reset oldInstance should be null.
*
* @param federationPolicyInitializationContext the current context
* @param oldInstance the existing (possibly null)
* instance.
* @param policyContext the current context
* @param oldInstance the existing (possibly null) instance.
*
* @return an updated {@link FederationAMRMProxyPolicy
}.
* @return an updated {@link FederationAMRMProxyPolicy }.
*
* @throws FederationPolicyInitializationException if the initialization
* cannot be completed
* properly. The oldInstance
* should be still valid in
* case of failed
* initialization.
* cannot be completed properly. The oldInstance should be still
* valid in case of failed initialization.
*/
FederationAMRMProxyPolicy getAMRMPolicy(
FederationPolicyInitializationContext
federationPolicyInitializationContext,
FederationPolicyInitializationContext policyContext,
FederationAMRMProxyPolicy oldInstance)
throws FederationPolicyInitializationException;
@ -78,21 +72,17 @@ public interface FederationPolicyManager {
* implementors should attempt to reinitalize (retaining state). To affect a
* complete policy reset oldInstance shoulb be set to null.
*
* @param federationPolicyInitializationContext the current context
* @param oldInstance the existing (possibly null)
* instance.
* @param policyContext the current context
* @param oldInstance the existing (possibly null) instance.
*
* @return an updated {@link FederationRouterPolicy}.
*
* @throws FederationPolicyInitializationException if the initalization cannot
* be completed properly. The
* oldInstance should be still
* valid in case of failed
* initialization.
* be completed properly. The oldInstance should be still valid in
* case of failed initialization.
*/
FederationRouterPolicy getRouterPolicy(
FederationPolicyInitializationContext
federationPolicyInitializationContext,
FederationPolicyInitializationContext policyContext,
FederationRouterPolicy oldInstance)
throws FederationPolicyInitializationException;
@ -102,23 +92,24 @@ public interface FederationPolicyManager {
* store.
*
* @return a valid policy configuration representing this object
* parametrization.
* parametrization.
*
* @throws FederationPolicyInitializationException if the current state cannot
* be serialized properly
* be serialized properly
*/
SubClusterPolicyConfiguration serializeConf()
throws FederationPolicyInitializationException;
/**
* This method returns the queue this policy is configured for.
*
* @return the name of the queue.
*/
String getQueue();
/**
* This methods provides a setter for the queue this policy is specified for.
*
* @param queue the name of the queue.
*/
void setQueue(String queue);

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
import java.util.Map;
import org.apache.hadoop.yarn.server.federation.policies.AbstractConfigurableFederationPolicy;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
/**
* Base abstract class for {@link FederationAMRMProxyPolicy} implementations,
* that provides common validation for reinitialization.
*/
public abstract class AbstractAMRMProxyPolicy extends
AbstractConfigurableFederationPolicy implements FederationAMRMProxyPolicy {
@Override
public void validate(WeightedPolicyInfo newPolicyInfo)
throws FederationPolicyInitializationException {
super.validate(newPolicyInfo);
Map<SubClusterIdInfo, Float> newWeights =
newPolicyInfo.getAMRMPolicyWeights();
if (newWeights == null || newWeights.size() < 1) {
throw new FederationPolicyInitializationException(
"Weight vector cannot be null/empty.");
}
}
}

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.UnknownSubclusterException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
/**
* An implementation of the {@link FederationAMRMProxyPolicy} that simply
* broadcasts each {@link ResourceRequest} to all the available sub-clusters.
*/
public class BroadcastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
private Set<SubClusterId> knownClusterIds = new HashSet<>();
@Override
public void reinitialize(
FederationPolicyInitializationContext policyContext)
throws FederationPolicyInitializationException {
// overrides initialize to avoid weight checks that do no apply for
// this policy.
FederationPolicyInitializationContextValidator
.validate(policyContext, this.getClass().getCanonicalName());
setPolicyContext(policyContext);
}
@Override
public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
List<ResourceRequest> resourceRequests) throws YarnException {
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();
Map<SubClusterId, List<ResourceRequest>> answer = new HashMap<>();
// simply broadcast the resource request to all sub-clusters
for (SubClusterId subClusterId : activeSubclusters.keySet()) {
answer.put(subClusterId, resourceRequests);
knownClusterIds.add(subClusterId);
}
return answer;
}
@Override
public void notifyOfResponse(SubClusterId subClusterId,
AllocateResponse response) throws YarnException {
if (!knownClusterIds.contains(subClusterId)) {
throw new UnknownSubclusterException(
"The response is received from a subcluster that is unknown to this "
+ "policy.");
}
// stateless policy does not care about responses
}
}

View File

@ -17,18 +17,18 @@
package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import java.util.List;
import java.util.Map;
/**
* Implementors of this interface provide logic to split the list of {@link
* ResourceRequest}s received by the AM among various RMs.
* Implementors of this interface provide logic to split the list of
* {@link ResourceRequest}s received by the AM among various RMs.
*/
public interface FederationAMRMProxyPolicy
extends ConfigurableFederationPolicy {
@ -37,18 +37,17 @@ public interface FederationAMRMProxyPolicy
* Splits the {@link ResourceRequest}s from the client across one or more
* sub-clusters based on the policy semantics (e.g., broadcast, load-based).
*
* @param resourceRequests the list of {@link ResourceRequest}s from the
* AM to be split
* @param resourceRequests the list of {@link ResourceRequest}s from the AM to
* be split
*
* @return map of sub-cluster as identified by {@link SubClusterId} to the
* list of {@link ResourceRequest}s that should be forwarded to it
* list of {@link ResourceRequest}s that should be forwarded to it
*
* @throws YarnException in case the request is malformed or no viable
* sub-clusters can be found.
* sub-clusters can be found.
*/
Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
List<ResourceRequest> resourceRequests)
throws YarnException;
List<ResourceRequest> resourceRequests) throws YarnException;
/**
* This method should be invoked to notify the policy about responses being
@ -60,7 +59,7 @@ public interface FederationAMRMProxyPolicy
*
* @throws YarnException in case the response is not valid
*/
void notifyOfResponse(SubClusterId subClusterId,
AllocateResponse response) throws YarnException;
void notifyOfResponse(SubClusterId subClusterId, AllocateResponse response)
throws YarnException;
}

View File

@ -0,0 +1,583 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
/**
* An implementation of the {@link FederationAMRMProxyPolicy} interface that
* carefully multicasts the requests with the following behavior:
*
* <p>
* Host localized {@link ResourceRequest}s are always forwarded to the RM that
* owns the corresponding node, based on the feedback of a
* {@link SubClusterResolver}. If the {@link SubClusterResolver} cannot resolve
* this node we default to forwarding the {@link ResourceRequest} to the home
* sub-cluster.
* </p>
*
* <p>
* Rack localized {@link ResourceRequest}s are forwarded to the RMs that owns
* the corresponding rack. Note that in some deployments each rack could be
* striped across multiple RMs. Thsi policy respects that. If the
* {@link SubClusterResolver} cannot resolve this rack we default to forwarding
* the {@link ResourceRequest} to the home sub-cluster.
* </p>
*
* <p>
* ANY requests corresponding to node/rack local requests are forwarded only to
* the set of RMs that owns the corresponding localized requests. The number of
* containers listed in each ANY is proportional to the number of localized
* container requests (associated to this ANY via the same allocateRequestId).
* </p>
*
* <p>
* ANY that are not associated to node/rack local requests are split among RMs
* based on the "weights" in the {@link WeightedPolicyInfo} configuration *and*
* headroom information. The {@code headroomAlpha} parameter of the policy
* configuration indicates how much headroom contributes to the splitting
* choice. Value of 1.0f indicates the weights are interpreted only as 0/1
* boolean but all splitting is based on the advertised headroom (fallback to
* 1/N for RMs that we don't have headroom info from). An {@code headroomAlpha}
* value of 0.0f means headroom is ignored and all splitting decisions are
* proportional to the "weights" in the configuration of the policy.
* </p>
*
* <p>
* ANY of zero size are forwarded to all known subclusters (i.e., subclusters
* where we scheduled containers before), as they may represent a user attempt
* to cancel a previous request (and we are mostly stateless now, so should
* forward to all known RMs).
* </p>
*
* <p>
* Invariants:
* </p>
*
* <p>
* The policy always excludes non-active RMs.
* </p>
*
* <p>
* The policy always excludes RMs that do not appear in the policy configuration
* weights, or have a weight of 0 (even if localized resources explicit refer to
* it).
* </p>
*
* <p>
* (Bar rounding to closest ceiling of fractional containers) The sum of
* requests made to multiple RMs at the ANY level "adds-up" to the user request.
* The maximum possible excess in a given request is a number of containers less
* or equal to number of sub-clusters in the federation.
* </p>
*/
public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
public static final Logger LOG =
LoggerFactory.getLogger(LocalityMulticastAMRMProxyPolicy.class);
private Map<SubClusterId, Float> weights;
private SubClusterResolver resolver;
private Map<SubClusterId, Resource> headroom;
private float hrAlpha;
private FederationStateStoreFacade federationFacade;
private AllocationBookkeeper bookkeeper;
private SubClusterId homeSubcluster;
@Override
public void reinitialize(
FederationPolicyInitializationContext policyContext)
throws FederationPolicyInitializationException {
// save reference to old weights
WeightedPolicyInfo tempPolicy = getPolicyInfo();
super.reinitialize(policyContext);
if (!getIsDirty()) {
return;
}
Map<SubClusterId, Float> newWeightsConverted = new HashMap<>();
boolean allInactive = true;
WeightedPolicyInfo policy = getPolicyInfo();
if (policy.getAMRMPolicyWeights() == null
|| policy.getAMRMPolicyWeights().size() == 0) {
allInactive = false;
} else {
for (Map.Entry<SubClusterIdInfo, Float> e : policy.getAMRMPolicyWeights()
.entrySet()) {
if (e.getValue() > 0) {
allInactive = false;
}
newWeightsConverted.put(e.getKey().toId(), e.getValue());
}
}
if (allInactive) {
// reset the policyInfo and throw
setPolicyInfo(tempPolicy);
throw new FederationPolicyInitializationException(
"The weights used to configure "
+ "this policy are all set to zero! (no ResourceRequest could be "
+ "forwarded with this setting.)");
}
if (policyContext.getHomeSubcluster() == null) {
setPolicyInfo(tempPolicy);
throw new FederationPolicyInitializationException("The homeSubcluster "
+ "filed in the context must be initialized to use this policy");
}
weights = newWeightsConverted;
resolver = policyContext.getFederationSubclusterResolver();
if (headroom == null) {
headroom = new ConcurrentHashMap<>();
}
hrAlpha = policy.getHeadroomAlpha();
this.federationFacade =
policyContext.getFederationStateStoreFacade();
this.bookkeeper = new AllocationBookkeeper();
this.homeSubcluster = policyContext.getHomeSubcluster();
}
@Override
public void notifyOfResponse(SubClusterId subClusterId,
AllocateResponse response) throws YarnException {
// stateless policy does not care about responses except tracking headroom
headroom.put(subClusterId, response.getAvailableResources());
}
@Override
public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
List<ResourceRequest> resourceRequests) throws YarnException {
// object used to accumulate statistics about the answer, initialize with
// active subclusters.
bookkeeper.reinitialize(federationFacade.getSubClusters(true));
List<ResourceRequest> nonLocalizedRequests =
new ArrayList<ResourceRequest>();
SubClusterId targetId = null;
Set<SubClusterId> targetIds = null;
// if the RR is resolved to a local subcluster add it directly (node and
// resolvable racks)
for (ResourceRequest rr : resourceRequests) {
targetId = null;
targetIds = null;
// Handle: ANY (accumulated for later)
if (ResourceRequest.isAnyLocation(rr.getResourceName())) {
nonLocalizedRequests.add(rr);
continue;
}
// Handle "node" requests
try {
targetId = resolver.getSubClusterForNode(rr.getResourceName());
} catch (YarnException e) {
// this might happen as we can't differentiate node from rack names
// we log altogether later
}
if (bookkeeper.isActiveAndEnabled(targetId)) {
bookkeeper.addLocalizedNodeRR(targetId, rr);
continue;
}
// Handle "rack" requests
try {
targetIds = resolver.getSubClustersForRack(rr.getResourceName());
} catch (YarnException e) {
// this might happen as we can't differentiate node from rack names
// we log altogether later
}
if (targetIds != null && targetIds.size() > 0) {
for (SubClusterId tid : targetIds) {
if (bookkeeper.isActiveAndEnabled(tid)) {
bookkeeper.addRackRR(tid, rr);
}
}
continue;
}
// Handle node/rack requests that the SubClusterResolver cannot map to
// any cluster. Defaulting to home subcluster.
if (LOG.isDebugEnabled()) {
LOG.debug("ERROR resolving sub-cluster for resourceName: "
+ rr.getResourceName() + " we are falling back to homeSubCluster:"
+ homeSubcluster);
}
// If home-subcluster is not active, ignore node/rack request
if (bookkeeper.isActiveAndEnabled(homeSubcluster)) {
bookkeeper.addLocalizedNodeRR(homeSubcluster, rr);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("The homeSubCluster (" + homeSubcluster + ") we are "
+ "defaulting to is not active, the ResourceRequest "
+ "will be ignored.");
}
}
}
// handle all non-localized requests (ANY)
splitAnyRequests(nonLocalizedRequests, bookkeeper);
return bookkeeper.getAnswer();
}
/**
* It splits a list of non-localized resource requests among sub-clusters.
*/
private void splitAnyRequests(List<ResourceRequest> originalResourceRequests,
AllocationBookkeeper allocationBookkeeper) throws YarnException {
for (ResourceRequest resourceRequest : originalResourceRequests) {
// FIRST: pick the target set of subclusters (based on whether this RR
// is associated with other localized requests via an allocationId)
Long allocationId = resourceRequest.getAllocationRequestId();
Set<SubClusterId> targetSubclusters;
if (allocationBookkeeper.getSubClustersForId(allocationId) != null) {
targetSubclusters =
allocationBookkeeper.getSubClustersForId(allocationId);
} else {
targetSubclusters = allocationBookkeeper.getActiveAndEnabledSC();
}
// SECOND: pick how much to ask to each RM for each request
splitIndividualAny(resourceRequest, targetSubclusters,
allocationBookkeeper);
}
}
/**
* Return a projection of this ANY {@link ResourceRequest} that belongs to
* this sub-cluster. This is done based on the "count" of the containers that
* require locality in each sublcuster (if any) or based on the "weights" and
* headroom.
*/
private void splitIndividualAny(ResourceRequest originalResourceRequest,
Set<SubClusterId> targetSubclusters,
AllocationBookkeeper allocationBookkeeper) {
long allocationId = originalResourceRequest.getAllocationRequestId();
for (SubClusterId targetId : targetSubclusters) {
float numContainer = originalResourceRequest.getNumContainers();
// If the ANY request has 0 containers to begin with we must forward it to
// any RM we have previously contacted (this might be the user way
// to cancel a previous request).
if (numContainer == 0 && headroom.containsKey(targetId)) {
allocationBookkeeper.addAnyRR(targetId, originalResourceRequest);
}
// If ANY is associated with localized asks, split based on their ratio
if (allocationBookkeeper.getSubClustersForId(allocationId) != null) {
float localityBasedWeight = getLocalityBasedWeighting(allocationId,
targetId, allocationBookkeeper);
numContainer = numContainer * localityBasedWeight;
} else {
// split ANY based on load and policy configuration
float headroomWeighting =
getHeadroomWeighting(targetId, allocationBookkeeper);
float policyWeighting =
getPolicyConfigWeighting(targetId, allocationBookkeeper);
// hrAlpha controls how much headroom influencing decision
numContainer = numContainer
* (hrAlpha * headroomWeighting + (1 - hrAlpha) * policyWeighting);
}
// if the calculated request is non-empty add it to the answer
if (numContainer > 0) {
ResourceRequest out =
ResourceRequest.newInstance(originalResourceRequest.getPriority(),
originalResourceRequest.getResourceName(),
originalResourceRequest.getCapability(),
originalResourceRequest.getNumContainers(),
originalResourceRequest.getRelaxLocality(),
originalResourceRequest.getNodeLabelExpression(),
originalResourceRequest.getExecutionTypeRequest());
out.setAllocationRequestId(allocationId);
out.setNumContainers((int) Math.ceil(numContainer));
if (out.isAnyLocation(out.getResourceName())) {
allocationBookkeeper.addAnyRR(targetId, out);
} else {
allocationBookkeeper.addRackRR(targetId, out);
}
}
}
}
/**
* Compute the weight to assign to a subcluster based on how many local
* requests a subcluster is target of.
*/
private float getLocalityBasedWeighting(long reqId, SubClusterId targetId,
AllocationBookkeeper allocationBookkeeper) {
float totWeight = allocationBookkeeper.getTotNumLocalizedContainers();
float localWeight =
allocationBookkeeper.getNumLocalizedContainers(reqId, targetId);
return totWeight > 0 ? localWeight / totWeight : 0;
}
/**
* Compute the "weighting" to give to a sublcuster based on the configured
* policy weights (for the active subclusters).
*/
private float getPolicyConfigWeighting(SubClusterId targetId,
AllocationBookkeeper allocationBookkeeper) {
float totWeight = allocationBookkeeper.totPolicyWeight;
Float localWeight = weights.get(targetId);
return (localWeight != null && totWeight > 0) ? localWeight / totWeight : 0;
}
/**
* Compute the weighting based on available headroom. This is proportional to
* the available headroom memory announced by RM, or to 1/N for RMs we have
* not seen yet. If all RMs report zero headroom, we fallback to 1/N again.
*/
private float getHeadroomWeighting(SubClusterId targetId,
AllocationBookkeeper allocationBookkeeper) {
// baseline weight for all RMs
float headroomWeighting =
1 / (float) allocationBookkeeper.getActiveAndEnabledSC().size();
// if we have headroom infomration for this sub-cluster (and we are safe
// from /0 issues)
if (headroom.containsKey(targetId)
&& allocationBookkeeper.totHeadroomMemory > 0) {
// compute which portion of the RMs that are active/enabled have reported
// their headroom (needed as adjustment factor)
// (note: getActiveAndEnabledSC should never be null/zero)
float ratioHeadroomKnown = allocationBookkeeper.totHeadRoomEnabledRMs
/ (float) allocationBookkeeper.getActiveAndEnabledSC().size();
// headroomWeighting is the ratio of headroom memory in the targetId
// cluster / total memory. The ratioHeadroomKnown factor is applied to
// adjust for missing information and ensure sum of allocated containers
// closely approximate what the user asked (small excess).
headroomWeighting = (headroom.get(targetId).getMemorySize()
/ allocationBookkeeper.totHeadroomMemory) * (ratioHeadroomKnown);
}
return headroomWeighting;
}
/**
* This helper class is used to book-keep the requests made to each
* subcluster, and maintain useful statistics to split ANY requests.
*/
private final class AllocationBookkeeper {
// the answer being accumulated
private Map<SubClusterId, List<ResourceRequest>> answer = new TreeMap<>();
// stores how many containers we have allocated in each RM for localized
// asks, used to correctly "spread" the corresponding ANY
private Map<Long, Map<SubClusterId, AtomicLong>> countContainersPerRM =
new HashMap<>();
private Set<SubClusterId> activeAndEnabledSC = new HashSet<>();
private long totNumLocalizedContainers = 0;
private float totHeadroomMemory = 0;
private int totHeadRoomEnabledRMs = 0;
private float totPolicyWeight = 0;
private void reinitialize(
Map<SubClusterId, SubClusterInfo> activeSubclusters)
throws YarnException {
// reset data structures
answer.clear();
countContainersPerRM.clear();
activeAndEnabledSC.clear();
totNumLocalizedContainers = 0;
totHeadroomMemory = 0;
totHeadRoomEnabledRMs = 0;
totPolicyWeight = 0;
// pre-compute the set of subclusters that are both active and enabled by
// the policy weights, and accumulate their total weight
for (Map.Entry<SubClusterId, Float> entry : weights.entrySet()) {
if (entry.getValue() > 0
&& activeSubclusters.containsKey(entry.getKey())) {
activeAndEnabledSC.add(entry.getKey());
totPolicyWeight += entry.getValue();
}
}
if (activeAndEnabledSC.size() < 1) {
throw new NoActiveSubclustersException(
"None of the subclusters enabled in this policy (weight>0) are "
+ "currently active we cannot forward the ResourceRequest(s)");
}
// pre-compute headroom-based weights for active/enabled subclusters
for (Map.Entry<SubClusterId, Resource> r : headroom.entrySet()) {
if (activeAndEnabledSC.contains(r.getKey())) {
totHeadroomMemory += r.getValue().getMemorySize();
totHeadRoomEnabledRMs++;
}
}
}
/**
* Add to the answer a localized node request, and keeps track of statistics
* on a per-allocation-id and per-subcluster bases.
*/
private void addLocalizedNodeRR(SubClusterId targetId, ResourceRequest rr) {
Preconditions.checkArgument(!rr.isAnyLocation(rr.getResourceName()));
if (!countContainersPerRM.containsKey(rr.getAllocationRequestId())) {
countContainersPerRM.put(rr.getAllocationRequestId(), new HashMap<>());
}
if (!countContainersPerRM.get(rr.getAllocationRequestId())
.containsKey(targetId)) {
countContainersPerRM.get(rr.getAllocationRequestId()).put(targetId,
new AtomicLong(0));
}
countContainersPerRM.get(rr.getAllocationRequestId()).get(targetId)
.addAndGet(rr.getNumContainers());
totNumLocalizedContainers += rr.getNumContainers();
internalAddToAnswer(targetId, rr);
}
/**
* Add a rack-local request to the final asnwer.
*/
public void addRackRR(SubClusterId targetId, ResourceRequest rr) {
Preconditions.checkArgument(!rr.isAnyLocation(rr.getResourceName()));
internalAddToAnswer(targetId, rr);
}
/**
* Add an ANY request to the final answer.
*/
private void addAnyRR(SubClusterId targetId, ResourceRequest rr) {
Preconditions.checkArgument(rr.isAnyLocation(rr.getResourceName()));
internalAddToAnswer(targetId, rr);
}
private void internalAddToAnswer(SubClusterId targetId,
ResourceRequest partialRR) {
if (!answer.containsKey(targetId)) {
answer.put(targetId, new ArrayList<ResourceRequest>());
}
answer.get(targetId).add(partialRR);
}
/**
* Return all known subclusters associated with an allocation id.
*
* @param allocationId the allocation id considered
*
* @return the list of {@link SubClusterId}s associated with this allocation
* id
*/
private Set<SubClusterId> getSubClustersForId(long allocationId) {
if (countContainersPerRM.get(allocationId) == null) {
return null;
}
return countContainersPerRM.get(allocationId).keySet();
}
/**
* Return the answer accumulated so far.
*
* @return the answer
*/
private Map<SubClusterId, List<ResourceRequest>> getAnswer() {
return answer;
}
/**
* Return the set of sub-clusters that are both active and allowed by our
* policy (weight > 0).
*
* @return a set of active and enabled {@link SubClusterId}s
*/
private Set<SubClusterId> getActiveAndEnabledSC() {
return activeAndEnabledSC;
}
/**
* Return the total number of container coming from localized requests.
*/
private long getTotNumLocalizedContainers() {
return totNumLocalizedContainers;
}
/**
* Returns the number of containers matching an allocation Id that are
* localized in the targetId subcluster.
*/
private long getNumLocalizedContainers(long allocationId,
SubClusterId targetId) {
AtomicLong c = countContainersPerRM.get(allocationId).get(targetId);
return c == null ? 0 : c.get();
}
/**
* Returns true is the subcluster request is both active and enabled.
*/
private boolean isActiveAndEnabled(SubClusterId targetId) {
if (targetId == null) {
return false;
} else {
return getActiveAndEnabledSC().contains(targetId);
}
}
}
}

View File

@ -17,4 +17,3 @@
*/
/** AMRMPRoxy policies. **/
package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;

View File

@ -17,10 +17,19 @@
package org.apache.hadoop.yarn.server.federation.policies.dao;
import com.sun.jersey.api.json.JSONConfiguration;
import com.sun.jersey.api.json.JSONJAXBContext;
import com.sun.jersey.api.json.JSONMarshaller;
import com.sun.jersey.api.json.JSONUnmarshaller;
import java.io.StringReader;
import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -29,24 +38,16 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import java.io.StringReader;
import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import com.sun.jersey.api.json.JSONConfiguration;
import com.sun.jersey.api.json.JSONJAXBContext;
import com.sun.jersey.api.json.JSONMarshaller;
import com.sun.jersey.api.json.JSONUnmarshaller;
/**
* This is a DAO class for the configuration of parameteres for federation
* policies. This generalizes several possible configurations as two lists of
* {@link SubClusterIdInfo} and corresponding weights as a
* {@link Float}. The interpretation of the weight is left to the logic in
* the policy.
* {@link SubClusterIdInfo} and corresponding weights as a {@link Float}. The
* interpretation of the weight is left to the logic in the policy.
*/
@InterfaceAudience.Private
@ -57,12 +58,14 @@ public class WeightedPolicyInfo {
private static final Logger LOG =
LoggerFactory.getLogger(WeightedPolicyInfo.class);
private static JSONJAXBContext jsonjaxbContext = initContext();
private Map<SubClusterIdInfo, Float> routerPolicyWeights = new HashMap<>();
private Map<SubClusterIdInfo, Float> amrmPolicyWeights = new HashMap<>();
private float headroomAlpha;
private static JSONJAXBContext jsonjaxbContext = initContext();
public WeightedPolicyInfo() {
// JAXB needs this
}
private static JSONJAXBContext initContext() {
try {
@ -74,8 +77,46 @@ public class WeightedPolicyInfo {
return null;
}
public WeightedPolicyInfo() {
//JAXB needs this
/**
* Deserializes a {@link WeightedPolicyInfo} from a byte UTF-8 JSON
* representation.
*
* @param bb the input byte representation.
*
* @return the {@link WeightedPolicyInfo} represented.
*
* @throws FederationPolicyInitializationException if a deserializaiton error
* occurs.
*/
public static WeightedPolicyInfo fromByteBuffer(ByteBuffer bb)
throws FederationPolicyInitializationException {
if (jsonjaxbContext == null) {
throw new FederationPolicyInitializationException(
"JSONJAXBContext should" + " not be null.");
}
try {
JSONUnmarshaller unmarshaller = jsonjaxbContext.createJSONUnmarshaller();
final byte[] bytes = new byte[bb.remaining()];
bb.get(bytes);
String params = new String(bytes, Charset.forName("UTF-8"));
WeightedPolicyInfo weightedPolicyInfo = unmarshaller.unmarshalFromJSON(
new StringReader(params), WeightedPolicyInfo.class);
return weightedPolicyInfo;
} catch (JAXBException j) {
throw new FederationPolicyInitializationException(j);
}
}
/**
* Getter of the router weights.
*
* @return the router weights.
*/
public Map<SubClusterIdInfo, Float> getRouterPolicyWeights() {
return routerPolicyWeights;
}
/**
@ -88,26 +129,9 @@ public class WeightedPolicyInfo {
this.routerPolicyWeights = policyWeights;
}
/**
* Setter method for ARMRMProxy weights.
*
* @param policyWeights the amrmproxy weights.
*/
public void setAMRMPolicyWeights(
Map<SubClusterIdInfo, Float> policyWeights) {
this.amrmPolicyWeights = policyWeights;
}
/**
* Getter of the router weights.
* @return the router weights.
*/
public Map<SubClusterIdInfo, Float> getRouterPolicyWeights() {
return routerPolicyWeights;
}
/**
* Getter for AMRMProxy weights.
*
* @return the AMRMProxy weights.
*/
public Map<SubClusterIdInfo, Float> getAMRMPolicyWeights() {
@ -115,53 +139,28 @@ public class WeightedPolicyInfo {
}
/**
* Deserializes a {@link WeightedPolicyInfo} from a byte UTF-8 JSON
* representation.
* Setter method for ARMRMProxy weights.
*
* @param bb the input byte representation.
*
* @return the {@link WeightedPolicyInfo} represented.
*
* @throws FederationPolicyInitializationException if a deserializaiton error
* occurs.
* @param policyWeights the amrmproxy weights.
*/
public static WeightedPolicyInfo fromByteBuffer(ByteBuffer bb)
throws FederationPolicyInitializationException {
if (jsonjaxbContext == null) {
throw new FederationPolicyInitializationException("JSONJAXBContext should"
+ " not be null.");
}
try {
JSONUnmarshaller unmarshaller = jsonjaxbContext.createJSONUnmarshaller();
final byte[] bytes = new byte[bb.remaining()];
bb.get(bytes);
String params = new String(bytes, Charset.forName("UTF-8"));
WeightedPolicyInfo weightedPolicyInfo = unmarshaller
.unmarshalFromJSON(new StringReader(params),
WeightedPolicyInfo.class);
return weightedPolicyInfo;
} catch (JAXBException j) {
throw new FederationPolicyInitializationException(j);
}
public void setAMRMPolicyWeights(Map<SubClusterIdInfo, Float> policyWeights) {
this.amrmPolicyWeights = policyWeights;
}
/**
* Converts the policy into a byte array representation in the input {@link
* ByteBuffer}.
* Converts the policy into a byte array representation in the input
* {@link ByteBuffer}.
*
* @return byte array representation of this policy configuration.
*
* @throws FederationPolicyInitializationException if a serialization error
* occurs.
* occurs.
*/
public ByteBuffer toByteBuffer()
throws FederationPolicyInitializationException {
if (jsonjaxbContext == null) {
throw new FederationPolicyInitializationException("JSONJAXBContext should"
+ " not be null.");
throw new FederationPolicyInitializationException(
"JSONJAXBContext should" + " not be null.");
}
try {
String s = toJSONString();
@ -186,22 +185,21 @@ public class WeightedPolicyInfo {
return false;
}
WeightedPolicyInfo otherPolicy =
(WeightedPolicyInfo) other;
WeightedPolicyInfo otherPolicy = (WeightedPolicyInfo) other;
Map<SubClusterIdInfo, Float> otherAMRMWeights =
otherPolicy.getAMRMPolicyWeights();
Map<SubClusterIdInfo, Float> otherRouterWeights =
otherPolicy.getRouterPolicyWeights();
boolean amrmWeightsMatch = otherAMRMWeights != null &&
getAMRMPolicyWeights() != null &&
CollectionUtils.isEqualCollection(otherAMRMWeights.entrySet(),
getAMRMPolicyWeights().entrySet());
boolean amrmWeightsMatch =
otherAMRMWeights != null && getAMRMPolicyWeights() != null
&& CollectionUtils.isEqualCollection(otherAMRMWeights.entrySet(),
getAMRMPolicyWeights().entrySet());
boolean routerWeightsMatch = otherRouterWeights != null &&
getRouterPolicyWeights() != null &&
CollectionUtils.isEqualCollection(otherRouterWeights.entrySet(),
getRouterPolicyWeights().entrySet());
boolean routerWeightsMatch =
otherRouterWeights != null && getRouterPolicyWeights() != null
&& CollectionUtils.isEqualCollection(otherRouterWeights.entrySet(),
getRouterPolicyWeights().entrySet());
return amrmWeightsMatch && routerWeightsMatch;
}
@ -215,10 +213,10 @@ public class WeightedPolicyInfo {
* Return the parameter headroomAlpha, used by policies that balance
* weight-based and load-based considerations in their decisions.
*
* For policies that use this parameter, values close to 1 indicate that
* most of the decision should be based on currently observed headroom from
* various sub-clusters, values close to zero, indicate that the decision
* should be mostly based on weights and practically ignore current load.
* For policies that use this parameter, values close to 1 indicate that most
* of the decision should be based on currently observed headroom from various
* sub-clusters, values close to zero, indicate that the decision should be
* mostly based on weights and practically ignore current load.
*
* @return the value of headroomAlpha.
*/
@ -227,13 +225,13 @@ public class WeightedPolicyInfo {
}
/**
* Set the parameter headroomAlpha, used by policies that balance
* weight-based and load-based considerations in their decisions.
* Set the parameter headroomAlpha, used by policies that balance weight-based
* and load-based considerations in their decisions.
*
* For policies that use this parameter, values close to 1 indicate that
* most of the decision should be based on currently observed headroom from
* various sub-clusters, values close to zero, indicate that the decision
* should be mostly based on weights and practically ignore current load.
* For policies that use this parameter, values close to 1 indicate that most
* of the decision should be based on currently observed headroom from various
* sub-clusters, values close to zero, indicate that the decision should be
* mostly based on weights and practically ignore current load.
*
* @param headroomAlpha the value to use for balancing.
*/

View File

@ -17,4 +17,3 @@
*/
/** DAO objects for serializing/deserializing policy configurations. **/
package org.apache.hadoop.yarn.server.federation.policies.dao;

View File

@ -17,4 +17,3 @@
*/
/** Exceptions for policies. **/
package org.apache.hadoop.yarn.server.federation.policies.exceptions;

View File

@ -17,4 +17,3 @@
*/
/** Federation Policies. **/
package org.apache.hadoop.yarn.server.federation.policies;

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.router;
import java.util.Map;
import org.apache.hadoop.yarn.server.federation.policies.AbstractConfigurableFederationPolicy;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
/**
* Base abstract class for {@link FederationRouterPolicy} implementations, that
* provides common validation for reinitialization.
*/
public abstract class AbstractRouterPolicy extends
AbstractConfigurableFederationPolicy implements FederationRouterPolicy {
@Override
public void validate(WeightedPolicyInfo newPolicyInfo)
throws FederationPolicyInitializationException {
super.validate(newPolicyInfo);
Map<SubClusterIdInfo, Float> newWeights =
newPolicyInfo.getRouterPolicyWeights();
if (newWeights == null || newWeights.size() < 1) {
throw new FederationPolicyInitializationException(
"Weight vector cannot be null/empty.");
}
}
}

View File

@ -35,11 +35,10 @@ public interface FederationRouterPolicy extends ConfigurableFederationPolicy {
* @param appSubmissionContext the context for the app being submitted.
*
* @return the sub-cluster as identified by {@link SubClusterId} to route the
* request to.
* request to.
*
* @throws YarnException if the policy cannot determine a viable subcluster.
*/
SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext)
throws YarnException;
ApplicationSubmissionContext appSubmissionContext) throws YarnException;
}

View File

@ -17,8 +17,8 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
@ -30,34 +30,27 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import java.util.Map;
/**
* This implements a simple load-balancing policy. The policy "weights" are
* binary 0/1 values that enable/disable each sub-cluster, and the policy peaks
* the sub-cluster with the least load to forward this application.
*/
public class LoadBasedRouterPolicy
extends BaseWeightedRouterPolicy {
private static final Log LOG =
LogFactory.getLog(LoadBasedRouterPolicy.class);
public class LoadBasedRouterPolicy extends AbstractRouterPolicy {
@Override
public void reinitialize(FederationPolicyInitializationContext
federationPolicyContext)
public void reinitialize(FederationPolicyInitializationContext policyContext)
throws FederationPolicyInitializationException {
// remember old policyInfo
WeightedPolicyInfo tempPolicy = getPolicyInfo();
//attempt new initialization
super.reinitialize(federationPolicyContext);
// attempt new initialization
super.reinitialize(policyContext);
//check extra constraints
// check extra constraints
for (Float weight : getPolicyInfo().getRouterPolicyWeights().values()) {
if (weight != 0 && weight != 1) {
//reset to old policyInfo if check fails
// reset to old policyInfo if check fails
setPolicyInfo(tempPolicy);
throw new FederationPolicyInitializationException(
this.getClass().getCanonicalName()
@ -69,18 +62,16 @@ public class LoadBasedRouterPolicy
@Override
public SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext)
throws YarnException {
ApplicationSubmissionContext appSubmissionContext) throws YarnException {
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();
Map<SubClusterIdInfo, Float> weights = getPolicyInfo()
.getRouterPolicyWeights();
Map<SubClusterIdInfo, Float> weights =
getPolicyInfo().getRouterPolicyWeights();
SubClusterIdInfo chosen = null;
long currBestMem = -1;
for (Map.Entry<SubClusterId, SubClusterInfo> entry :
activeSubclusters
for (Map.Entry<SubClusterId, SubClusterInfo> entry : activeSubclusters
.entrySet()) {
SubClusterIdInfo id = new SubClusterIdInfo(entry.getKey());
if (weights.containsKey(id) && weights.get(id) > 0) {
@ -95,8 +86,7 @@ public class LoadBasedRouterPolicy
return chosen.toId();
}
private long getAvailableMemory(SubClusterInfo value)
throws YarnException {
private long getAvailableMemory(SubClusterInfo value) throws YarnException {
try {
long mem = -1;
JSONObject obj = new JSONObject(value.getCapability());

View File

@ -17,39 +17,32 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import java.util.Map;
/**
* This implements a policy that interprets "weights" as a ordered list of
* preferences among sub-clusters. Highest weight among active subclusters is
* chosen.
*/
public class PriorityRouterPolicy
extends BaseWeightedRouterPolicy {
private static final Log LOG =
LogFactory.getLog(PriorityRouterPolicy.class);
public class PriorityRouterPolicy extends AbstractRouterPolicy {
@Override
public SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext)
throws YarnException {
ApplicationSubmissionContext appSubmissionContext) throws YarnException {
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();
// This finds the sub-cluster with the highest weight among the
// currently active ones.
Map<SubClusterIdInfo, Float> weights = getPolicyInfo()
.getRouterPolicyWeights();
Map<SubClusterIdInfo, Float> weights =
getPolicyInfo().getRouterPolicyWeights();
SubClusterId chosen = null;
Float currentBest = Float.MIN_VALUE;
for (SubClusterId id : activeSubclusters.keySet()) {

View File

@ -17,6 +17,11 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
@ -25,11 +30,6 @@ import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPo
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
/**
* This simple policy picks at uniform random among any of the currently active
* subclusters. This policy is easy to use and good for testing.
@ -39,7 +39,7 @@ import java.util.Random;
* of the "weights", in which case the {@link UniformRandomRouterPolicy} send
* load to them, while {@code WeightedRandomRouterPolicy} does not.
*/
public class UniformRandomRouterPolicy extends BaseWeightedRouterPolicy {
public class UniformRandomRouterPolicy extends AbstractRouterPolicy {
private Random rand;
@ -49,14 +49,14 @@ public class UniformRandomRouterPolicy extends BaseWeightedRouterPolicy {
@Override
public void reinitialize(
FederationPolicyInitializationContext federationPolicyContext)
FederationPolicyInitializationContext policyContext)
throws FederationPolicyInitializationException {
FederationPolicyInitializationContextValidator
.validate(federationPolicyContext, this.getClass().getCanonicalName());
.validate(policyContext, this.getClass().getCanonicalName());
//note: this overrides BaseWeighterRouterPolicy and ignores the weights
// note: this overrides AbstractRouterPolicy and ignores the weights
setPolicyContext(federationPolicyContext);
setPolicyContext(policyContext);
}
/**
@ -64,21 +64,19 @@ public class UniformRandomRouterPolicy extends BaseWeightedRouterPolicy {
* depend on the weights in the policy).
*
* @param appSubmissionContext the context for the app being submitted
* (ignored).
* (ignored).
*
* @return a randomly chosen subcluster.
*
* @throws YarnException if there are no active subclusters.
*/
public SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext)
throws YarnException {
ApplicationSubmissionContext appSubmissionContext) throws YarnException {
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();
List<SubClusterId> list =
new ArrayList<>(activeSubclusters.keySet());
List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
return list.get(rand.nextInt(list.size()));
}

View File

@ -18,32 +18,30 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import java.util.Map;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This policy implements a weighted random sample among currently active
* sub-clusters.
*/
public class WeightedRandomRouterPolicy
extends BaseWeightedRouterPolicy {
public class WeightedRandomRouterPolicy extends AbstractRouterPolicy {
private static final Log LOG =
LogFactory.getLog(WeightedRandomRouterPolicy.class);
private static final Logger LOG =
LoggerFactory.getLogger(WeightedRandomRouterPolicy.class);
private Random rand = new Random(System.currentTimeMillis());
@Override
public SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext)
throws YarnException {
ApplicationSubmissionContext appSubmissionContext) throws YarnException {
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();
@ -52,13 +50,13 @@ public class WeightedRandomRouterPolicy
// changes dynamically (and this would unfairly spread the load to
// sub-clusters adjacent to an inactive one), hence we need to count/scan
// the list and based on weight pick the next sub-cluster.
Map<SubClusterIdInfo, Float> weights = getPolicyInfo()
.getRouterPolicyWeights();
Map<SubClusterIdInfo, Float> weights =
getPolicyInfo().getRouterPolicyWeights();
float totActiveWeight = 0;
for(Map.Entry<SubClusterIdInfo, Float> entry : weights.entrySet()){
if(entry.getKey()!=null && activeSubclusters.containsKey(entry.getKey()
.toId())){
for (Map.Entry<SubClusterIdInfo, Float> entry : weights.entrySet()) {
if (entry.getKey() != null
&& activeSubclusters.containsKey(entry.getKey().toId())) {
totActiveWeight += entry.getValue();
}
}
@ -73,7 +71,7 @@ public class WeightedRandomRouterPolicy
return id;
}
}
//should never happen
// should never happen
return null;
}
}

View File

@ -17,4 +17,3 @@
*/
/** Router policies. **/
package org.apache.hadoop.yarn.server.federation.policies.router;

View File

@ -57,11 +57,11 @@ public abstract class AbstractSubClusterResolver implements SubClusterResolver {
return rackToSubClusters.get(rackname);
}
protected Map<String, SubClusterId> getNodeToSubCluster() {
public Map<String, SubClusterId> getNodeToSubCluster() {
return nodeToSubCluster;
}
protected Map<String, Set<SubClusterId>> getRackToSubClusters() {
public Map<String, Set<SubClusterId>> getRackToSubClusters() {
return rackToSubClusters;
}
}

View File

@ -22,14 +22,17 @@ import static org.mockito.Mockito.mock;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException;
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@ -49,6 +52,7 @@ public abstract class BaseFederationPoliciesTest {
private ApplicationSubmissionContext applicationSubmissionContext =
mock(ApplicationSubmissionContext.class);
private Random rand = new Random();
private SubClusterId homeSubCluster;
@Test
public void testReinitilialize() throws YarnException {
@ -88,16 +92,22 @@ public abstract class BaseFederationPoliciesTest {
getPolicy().reinitialize(fpc);
}
@Test(expected = NoActiveSubclustersException.class)
@Test(expected = FederationPolicyException.class)
public void testNoSubclusters() throws YarnException {
// empty the activeSubclusters map
FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
getPolicyInfo(), new HashMap<>());
ConfigurableFederationPolicy currentPolicy = getPolicy();
if (currentPolicy instanceof FederationRouterPolicy) {
((FederationRouterPolicy) currentPolicy)
ConfigurableFederationPolicy localPolicy = getPolicy();
if (localPolicy instanceof FederationRouterPolicy) {
((FederationRouterPolicy) localPolicy)
.getHomeSubcluster(getApplicationSubmissionContext());
} else {
String[] hosts = new String[] {"host1", "host2" };
List<ResourceRequest> resourceRequests = FederationPoliciesTestUtil
.createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false);
((FederationAMRMProxyPolicy) localPolicy)
.splitResourceRequests(resourceRequests);
}
}
@ -152,4 +162,12 @@ public abstract class BaseFederationPoliciesTest {
this.rand = rand;
}
public SubClusterId getHomeSubCluster() {
return homeSubCluster;
}
public void setHomeSubCluster(SubClusterId homeSubCluster) {
this.homeSubCluster = homeSubCluster;
}
}

View File

@ -16,22 +16,20 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.policies;
import java.nio.ByteBuffer;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
/**
* Test class for {@link FederationPolicyInitializationContextValidator}.
*/
@ -45,11 +43,10 @@ public class TestFederationPolicyInitializationContextValidator {
@Before
public void setUp() throws Exception {
goodFacade = FederationPoliciesTestUtil.initFacade();
goodConfig =
new MockPolicyManager().serializeConf();
goodSR =FederationPoliciesTestUtil.initResolver();
context = new
FederationPolicyInitializationContext(goodConfig, goodSR, goodFacade);
goodConfig = new MockPolicyManager().serializeConf();
goodSR = FederationPoliciesTestUtil.initResolver();
context = new FederationPolicyInitializationContext(goodConfig, goodSR,
goodFacade);
}
@Test
@ -100,8 +97,7 @@ public class TestFederationPolicyInitializationContextValidator {
@Override
public FederationAMRMProxyPolicy getAMRMPolicy(
FederationPolicyInitializationContext
federationPolicyInitializationContext,
FederationPolicyInitializationContext policyContext,
FederationAMRMProxyPolicy oldInstance)
throws FederationPolicyInitializationException {
return null;
@ -109,8 +105,7 @@ public class TestFederationPolicyInitializationContextValidator {
@Override
public FederationRouterPolicy getRouterPolicy(
FederationPolicyInitializationContext
federationPolicyInitializationContext,
FederationPolicyInitializationContext policyContext,
FederationRouterPolicy oldInstance)
throws FederationPolicyInitializationException {
return null;
@ -120,8 +115,8 @@ public class TestFederationPolicyInitializationContextValidator {
public SubClusterPolicyConfiguration serializeConf()
throws FederationPolicyInitializationException {
ByteBuffer buf = ByteBuffer.allocate(0);
return SubClusterPolicyConfiguration
.newInstance("queue1", this.getClass().getCanonicalName(), buf);
return SubClusterPolicyConfiguration.newInstance("queue1",
this.getClass().getCanonicalName(), buf);
}
@Override

View File

@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Simple test class for the {@link BroadcastAMRMProxyPolicy}.
*/
public class TestBroadcastAMRMProxyFederationPolicy
extends BaseFederationPoliciesTest {
@Before
public void setUp() throws Exception {
setPolicy(new BroadcastAMRMProxyPolicy());
// needed for base test to work
setPolicyInfo(mock(WeightedPolicyInfo.class));
for (int i = 1; i <= 2; i++) {
SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i);
SubClusterInfo sci = mock(SubClusterInfo.class);
when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
when(sci.getSubClusterId()).thenReturn(sc.toId());
getActiveSubclusters().put(sc.toId(), sci);
}
FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
mock(WeightedPolicyInfo.class), getActiveSubclusters());
}
@Test
public void testSplitAllocateRequest() throws Exception {
// verify the request is broadcasted to all subclusters
String[] hosts = new String[] {"host1", "host2" };
List<ResourceRequest> resourceRequests = FederationPoliciesTestUtil
.createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false);
Map<SubClusterId, List<ResourceRequest>> response =
((FederationAMRMProxyPolicy) getPolicy())
.splitResourceRequests(resourceRequests);
Assert.assertTrue(response.size() == 2);
for (Map.Entry<SubClusterId, List<ResourceRequest>> entry : response
.entrySet()) {
Assert.assertTrue(getActiveSubclusters().get(entry.getKey()) != null);
for (ResourceRequest r : entry.getValue()) {
Assert.assertTrue(resourceRequests.contains(r));
}
}
for (SubClusterId subClusterId : getActiveSubclusters().keySet()) {
for (ResourceRequest r : response.get(subClusterId)) {
Assert.assertTrue(resourceRequests.contains(r));
}
}
}
@Test
public void testNotifyOfResponse() throws Exception {
String[] hosts = new String[] {"host1", "host2" };
List<ResourceRequest> resourceRequests = FederationPoliciesTestUtil
.createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false);
Map<SubClusterId, List<ResourceRequest>> response =
((FederationAMRMProxyPolicy) getPolicy())
.splitResourceRequests(resourceRequests);
try {
((FederationAMRMProxyPolicy) getPolicy()).notifyOfResponse(
SubClusterId.newInstance("sc3"), mock(AllocateResponse.class));
Assert.fail();
} catch (FederationPolicyException f) {
System.out.println("Expected: " + f.getMessage());
}
((FederationAMRMProxyPolicy) getPolicy()).notifyOfResponse(
SubClusterId.newInstance("sc1"), mock(AllocateResponse.class));
}
}

View File

@ -0,0 +1,566 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.resolver.DefaultSubClusterResolverImpl;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Simple test class for the {@link LocalityMulticastAMRMProxyPolicy}.
*/
public class TestLocalityMulticastAMRMProxyPolicy
extends BaseFederationPoliciesTest {
public static final Logger LOG =
LoggerFactory.getLogger(TestLocalityMulticastAMRMProxyPolicy.class);
@Before
public void setUp() throws Exception {
setPolicy(new LocalityMulticastAMRMProxyPolicy());
setPolicyInfo(new WeightedPolicyInfo());
Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
Map<SubClusterIdInfo, Float> amrmWeights = new HashMap<>();
// simulate 20 subclusters with a 5% chance of being inactive
for (int i = 0; i < 6; i++) {
SubClusterIdInfo sc = new SubClusterIdInfo("subcluster" + i);
// sub-cluster 3 is not active
if (i != 3) {
SubClusterInfo sci = mock(SubClusterInfo.class);
when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
when(sci.getSubClusterId()).thenReturn(sc.toId());
getActiveSubclusters().put(sc.toId(), sci);
}
float weight = 1 / 10f;
routerWeights.put(sc, weight);
amrmWeights.put(sc, weight);
// sub-cluster 4 is "disabled" in the weights
if (i == 4) {
routerWeights.put(sc, 0f);
amrmWeights.put(sc, 0f);
}
}
getPolicyInfo().setRouterPolicyWeights(routerWeights);
getPolicyInfo().setAMRMPolicyWeights(amrmWeights);
getPolicyInfo().setHeadroomAlpha(0.5f);
setHomeSubCluster(SubClusterId.newInstance("homesubcluster"));
}
@Test
public void testReinitilialize() throws YarnException {
initializePolicy();
}
private void initializePolicy() throws YarnException {
setFederationPolicyContext(new FederationPolicyInitializationContext());
SubClusterResolver resolver = FederationPoliciesTestUtil.initResolver();
getFederationPolicyContext().setFederationSubclusterResolver(resolver);
ByteBuffer buf = getPolicyInfo().toByteBuffer();
getFederationPolicyContext().setSubClusterPolicyConfiguration(
SubClusterPolicyConfiguration.newInstance("queue1",
getPolicy().getClass().getCanonicalName(), buf));
getFederationPolicyContext().setHomeSubcluster(getHomeSubCluster());
FederationPoliciesTestUtil.initializePolicyContext(
getFederationPolicyContext(), getPolicy(), getPolicyInfo(),
getActiveSubclusters());
}
@Test
public void testSplitBasedOnHeadroom() throws Exception {
// Tests how the headroom info are used to split based on the capacity
// each RM claims to give us.
// Configure policy to be 100% headroom based
getPolicyInfo().setHeadroomAlpha(1.0f);
initializePolicy();
List<ResourceRequest> resourceRequests = createSimpleRequest();
prepPolicyWithHeadroom();
Map<SubClusterId, List<ResourceRequest>> response =
((FederationAMRMProxyPolicy) getPolicy())
.splitResourceRequests(resourceRequests);
// pretty print requests
LOG.info("Initial headroom");
prettyPrintRequests(response);
validateSplit(response, resourceRequests);
// based on headroom, we expect 75 containers to got to subcluster0,
// as it advertise lots of headroom (100), no containers for sublcuster1
// as it advertise zero headroom, 1 to subcluster 2 (as it advertise little
// headroom (1), and 25 to subcluster5 which has unknown headroom, and so
// it gets 1/4th of the load
checkExpectedAllocation(response, "subcluster0", 1, 75);
checkExpectedAllocation(response, "subcluster1", 1, -1);
checkExpectedAllocation(response, "subcluster2", 1, 1);
checkExpectedAllocation(response, "subcluster5", 1, 25);
// notify a change in headroom and try again
AllocateResponse ar = getAllocateResponseWithTargetHeadroom(100);
((FederationAMRMProxyPolicy) getPolicy())
.notifyOfResponse(SubClusterId.newInstance("subcluster2"), ar);
((FederationAMRMProxyPolicy) getPolicy())
.splitResourceRequests(resourceRequests);
LOG.info("After headroom update");
prettyPrintRequests(response);
validateSplit(response, resourceRequests);
// we simulated a change in headroom for subcluster2, which will now
// have the same headroom of subcluster0 and so it splits the requests
// note that the total is still less or equal to (userAsk + numSubClusters)
checkExpectedAllocation(response, "subcluster0", 1, 38);
checkExpectedAllocation(response, "subcluster1", 1, -1);
checkExpectedAllocation(response, "subcluster2", 1, 38);
checkExpectedAllocation(response, "subcluster5", 1, 25);
}
@Test(timeout = 5000)
public void testStressPolicy() throws Exception {
// Tests how the headroom info are used to split based on the capacity
// each RM claims to give us.
// Configure policy to be 100% headroom based
getPolicyInfo().setHeadroomAlpha(1.0f);
initializePolicy();
int numRR = 1000;
List<ResourceRequest> resourceRequests = createLargeRandomList(numRR);
prepPolicyWithHeadroom();
int numIterations = 1000;
long tstart = System.currentTimeMillis();
for (int i = 0; i < numIterations; i++) {
Map<SubClusterId, List<ResourceRequest>> response =
((FederationAMRMProxyPolicy) getPolicy())
.splitResourceRequests(resourceRequests);
validateSplit(response, resourceRequests);
}
long tend = System.currentTimeMillis();
LOG.info("Performed " + numIterations + " policy invocations (and "
+ "validations) in " + (tend - tstart) + "ms");
}
@Test
public void testFWDAllZeroANY() throws Exception {
// Tests how the headroom info are used to split based on the capacity
// each RM claims to give us.
// Configure policy to be 100% headroom based
getPolicyInfo().setHeadroomAlpha(0.5f);
initializePolicy();
List<ResourceRequest> resourceRequests = createZeroSizedANYRequest();
// this receives responses from sc0,sc1,sc2
prepPolicyWithHeadroom();
Map<SubClusterId, List<ResourceRequest>> response =
((FederationAMRMProxyPolicy) getPolicy())
.splitResourceRequests(resourceRequests);
// we expect all three to appear for a zero-sized ANY
// pretty print requests
prettyPrintRequests(response);
validateSplit(response, resourceRequests);
// we expect the zero size request to be sent to the first 3 rm (due to
// the fact that we received responses only from these 3 sublcusters)
checkExpectedAllocation(response, "subcluster0", 1, 0);
checkExpectedAllocation(response, "subcluster1", 1, 0);
checkExpectedAllocation(response, "subcluster2", 1, 0);
checkExpectedAllocation(response, "subcluster3", -1, -1);
checkExpectedAllocation(response, "subcluster4", -1, -1);
checkExpectedAllocation(response, "subcluster5", -1, -1);
}
@Test
public void testSplitBasedOnHeadroomAndWeights() throws Exception {
// Tests how the headroom info are used to split based on the capacity
// each RM claims to give us.
// Configure policy to be 50% headroom based and 50% weight based
getPolicyInfo().setHeadroomAlpha(0.5f);
initializePolicy();
List<ResourceRequest> resourceRequests = createSimpleRequest();
prepPolicyWithHeadroom();
Map<SubClusterId, List<ResourceRequest>> response =
((FederationAMRMProxyPolicy) getPolicy())
.splitResourceRequests(resourceRequests);
// pretty print requests
prettyPrintRequests(response);
validateSplit(response, resourceRequests);
// in this case the headroom allocates 50 containers, while weights allocate
// the rest. due to weights we have 12.5 (round to 13) containers for each
// sublcuster, the rest is due to headroom.
checkExpectedAllocation(response, "subcluster0", 1, 50);
checkExpectedAllocation(response, "subcluster1", 1, 13);
checkExpectedAllocation(response, "subcluster2", 1, 13);
checkExpectedAllocation(response, "subcluster3", -1, -1);
checkExpectedAllocation(response, "subcluster4", -1, -1);
checkExpectedAllocation(response, "subcluster5", 1, 25);
}
private void prepPolicyWithHeadroom() throws YarnException {
AllocateResponse ar = getAllocateResponseWithTargetHeadroom(100);
((FederationAMRMProxyPolicy) getPolicy())
.notifyOfResponse(SubClusterId.newInstance("subcluster0"), ar);
ar = getAllocateResponseWithTargetHeadroom(0);
((FederationAMRMProxyPolicy) getPolicy())
.notifyOfResponse(SubClusterId.newInstance("subcluster1"), ar);
ar = getAllocateResponseWithTargetHeadroom(1);
((FederationAMRMProxyPolicy) getPolicy())
.notifyOfResponse(SubClusterId.newInstance("subcluster2"), ar);
}
private AllocateResponse getAllocateResponseWithTargetHeadroom(
int numContainers) {
return AllocateResponse.newInstance(0, null, null,
Collections.<NodeReport> emptyList(),
Resource.newInstance(numContainers * 1024, numContainers), null, 10,
null, Collections.<NMToken> emptyList());
}
@Test
public void testSplitAllocateRequest() throws Exception {
// Test a complex List<ResourceRequest> is split correctly
initializePolicy();
// modify default initialization to include a "homesubcluster"
// which we will use as the default for when nodes or racks are unknown
SubClusterInfo sci = mock(SubClusterInfo.class);
when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
when(sci.getSubClusterId()).thenReturn(getHomeSubCluster());
getActiveSubclusters().put(getHomeSubCluster(), sci);
SubClusterIdInfo sc = new SubClusterIdInfo(getHomeSubCluster().getId());
getPolicyInfo().getRouterPolicyWeights().put(sc, 0.1f);
getPolicyInfo().getAMRMPolicyWeights().put(sc, 0.1f);
FederationPoliciesTestUtil.initializePolicyContext(
getFederationPolicyContext(), getPolicy(), getPolicyInfo(),
getActiveSubclusters());
List<ResourceRequest> resourceRequests = createComplexRequest();
Map<SubClusterId, List<ResourceRequest>> response =
((FederationAMRMProxyPolicy) getPolicy())
.splitResourceRequests(resourceRequests);
validateSplit(response, resourceRequests);
prettyPrintRequests(response);
// we expect 4 entry for home subcluster (3 for request-id 4, and a part
// of the broadcast of request-id 2
checkExpectedAllocation(response, getHomeSubCluster().getId(), 4, 23);
// for subcluster0 we expect 3 entry from request-id 0, and 3 from
// request-id 3, as well as part of the request-id 2 broadast
checkExpectedAllocation(response, "subcluster0", 7, 26);
// we expect 5 entry for subcluster1 (4 from request-id 1, and part
// of the broadcast of request-id 2
checkExpectedAllocation(response, "subcluster1", 5, 25);
// sub-cluster 2 should contain 3 entry from request-id 1 and 1 from the
// broadcast of request-id 2, and no request-id 0
checkExpectedAllocation(response, "subcluster2", 4, 23);
// subcluster id 3, 4 should not appear (due to weights or active/inactive)
checkExpectedAllocation(response, "subcluster3", -1, -1);
checkExpectedAllocation(response, "subcluster4", -1, -1);
// subcluster5 should get only part of the request-id 2 broadcast
checkExpectedAllocation(response, "subcluster5", 1, 20);
// check that the allocations that show up are what expected
for (ResourceRequest rr : response.get(getHomeSubCluster())) {
Assert.assertTrue(rr.getAllocationRequestId() == 4L
|| rr.getAllocationRequestId() == 2L);
}
for (ResourceRequest rr : response.get(getHomeSubCluster())) {
Assert.assertTrue(rr.getAllocationRequestId() != 1L);
}
List<ResourceRequest> rrs =
response.get(SubClusterId.newInstance("subcluster0"));
for (ResourceRequest rr : rrs) {
Assert.assertTrue(rr.getAllocationRequestId() != 1L);
}
for (ResourceRequest rr : response
.get(SubClusterId.newInstance("subcluster2"))) {
Assert.assertTrue(rr.getAllocationRequestId() != 0L);
}
for (ResourceRequest rr : response
.get(SubClusterId.newInstance("subcluster5"))) {
Assert.assertTrue(rr.getAllocationRequestId() >= 2);
Assert.assertTrue(rr.getRelaxLocality());
}
}
// check that the number of containers in the first ResourceRequest in
// response for this sub-cluster matches expectations. -1 indicate the
// response should be null
private void checkExpectedAllocation(
Map<SubClusterId, List<ResourceRequest>> response, String subCluster,
long totResourceRequests, long totContainers) {
if (totContainers == -1) {
Assert.assertNull(response.get(SubClusterId.newInstance(subCluster)));
} else {
SubClusterId sc = SubClusterId.newInstance(subCluster);
Assert.assertEquals(totResourceRequests, response.get(sc).size());
long actualContCount = 0;
for (ResourceRequest rr : response.get(sc)) {
actualContCount += rr.getNumContainers();
}
Assert.assertEquals(totContainers, actualContCount);
}
}
private void validateSplit(Map<SubClusterId, List<ResourceRequest>> split,
List<ResourceRequest> original) throws YarnException {
SubClusterResolver resolver =
getFederationPolicyContext().getFederationSubclusterResolver();
// Apply general validation rules
int numUsedSubclusters = split.size();
Set<Long> originalIds = new HashSet<>();
Set<Long> splitIds = new HashSet<>();
int originalContainers = 0;
for (ResourceRequest rr : original) {
originalContainers += rr.getNumContainers();
originalIds.add(rr.getAllocationRequestId());
}
int splitContainers = 0;
for (Map.Entry<SubClusterId, List<ResourceRequest>> rrs : split
.entrySet()) {
for (ResourceRequest rr : rrs.getValue()) {
splitContainers += rr.getNumContainers();
splitIds.add(rr.getAllocationRequestId());
// check node-local asks are sent to right RM (only)
SubClusterId fid = null;
try {
fid = resolver.getSubClusterForNode(rr.getResourceName());
} catch (YarnException e) {
// ignore code will handle
}
if (!rrs.getKey().equals(getHomeSubCluster()) && fid != null
&& !fid.equals(rrs.getKey())) {
Assert.fail("A node-local (or resolvable rack-local) RR should not "
+ "be send to an RM other than what it resolves to.");
}
}
}
// check we are not inventing Allocation Ids
Assert.assertEquals(originalIds, splitIds);
// check we are not exceedingly replicating the container asks among
// RMs (a little is allowed due to rounding of fractional splits)
Assert.assertTrue(
" Containers requested (" + splitContainers + ") should "
+ "not exceed the original count of containers ("
+ originalContainers + ") by more than the number of subclusters ("
+ numUsedSubclusters + ")",
originalContainers + numUsedSubclusters >= splitContainers);
// Test target Ids
for (SubClusterId targetId : split.keySet()) {
Assert.assertTrue("Target subclusters should be in the active set",
getActiveSubclusters().containsKey(targetId));
Assert.assertTrue(
"Target subclusters (" + targetId + ") should have weight >0 in "
+ "the policy ",
getPolicyInfo().getRouterPolicyWeights()
.get(new SubClusterIdInfo(targetId)) > 0);
}
}
private void prettyPrintRequests(
Map<SubClusterId, List<ResourceRequest>> response) {
for (Map.Entry<SubClusterId, List<ResourceRequest>> entry : response
.entrySet()) {
String str = "";
for (ResourceRequest rr : entry.getValue()) {
str += " [id:" + rr.getAllocationRequestId() + " loc:"
+ rr.getResourceName() + " numCont:" + rr.getNumContainers()
+ "], ";
}
LOG.info(entry.getKey() + " --> " + str);
}
}
private List<ResourceRequest> createLargeRandomList(int numRR)
throws Exception {
List<ResourceRequest> out = new ArrayList<>();
Random rand = new Random(1);
DefaultSubClusterResolverImpl resolver =
(DefaultSubClusterResolverImpl) getFederationPolicyContext()
.getFederationSubclusterResolver();
List<String> nodes =
new ArrayList<>(resolver.getNodeToSubCluster().keySet());
for (int i = 0; i < numRR; i++) {
String nodeName = nodes.get(rand.nextInt(nodes.size()));
long allocationId = (long) rand.nextInt(20);
// create a single container request in sc0
out.add(FederationPoliciesTestUtil.createResourceRequest(allocationId,
nodeName, 1024, 1, 1, rand.nextInt(100), null, rand.nextBoolean()));
}
return out;
}
private List<ResourceRequest> createSimpleRequest() throws Exception {
List<ResourceRequest> out = new ArrayList<>();
// create a single container request in sc0
out.add(FederationPoliciesTestUtil.createResourceRequest(0L,
ResourceRequest.ANY, 1024, 1, 1, 100, null, true));
return out;
}
private List<ResourceRequest> createZeroSizedANYRequest() throws Exception {
List<ResourceRequest> out = new ArrayList<>();
// create a single container request in sc0
out.add(FederationPoliciesTestUtil.createResourceRequest(0L,
ResourceRequest.ANY, 1024, 1, 1, 0, null, true));
return out;
}
private List<ResourceRequest> createComplexRequest() throws Exception {
List<ResourceRequest> out = new ArrayList<>();
// create a single container request in sc0
out.add(FederationPoliciesTestUtil.createResourceRequest(0L,
"subcluster0-rack0-host0", 1024, 1, 1, 1, null, false));
out.add(FederationPoliciesTestUtil.createResourceRequest(0L,
"subcluster0-rack0", 1024, 1, 1, 1, null, false));
out.add(FederationPoliciesTestUtil.createResourceRequest(0L,
ResourceRequest.ANY, 1024, 1, 1, 1, null, false));
// create a single container request with 3 alternative hosts across sc1,sc2
// where we want 2 containers in sc1 and 1 in sc2
out.add(FederationPoliciesTestUtil.createResourceRequest(1L,
"subcluster1-rack1-host1", 1024, 1, 1, 1, null, false));
out.add(FederationPoliciesTestUtil.createResourceRequest(1L,
"subcluster1-rack1-host2", 1024, 1, 1, 1, null, false));
out.add(FederationPoliciesTestUtil.createResourceRequest(1L,
"subcluster2-rack3-host3", 1024, 1, 1, 1, null, false));
out.add(FederationPoliciesTestUtil.createResourceRequest(1L,
"subcluster1-rack1", 1024, 1, 1, 2, null, false));
out.add(FederationPoliciesTestUtil.createResourceRequest(1L,
"subcluster2-rack3", 1024, 1, 1, 1, null, false));
out.add(FederationPoliciesTestUtil.createResourceRequest(1L,
ResourceRequest.ANY, 1024, 1, 1, 2, null, false));
// create a non-local ANY request that can span anything
out.add(FederationPoliciesTestUtil.createResourceRequest(2L,
ResourceRequest.ANY, 1024, 1, 1, 100, null, true));
// create a single container request in sc0 with relaxed locality
out.add(FederationPoliciesTestUtil.createResourceRequest(3L,
"subcluster0-rack0-host0", 1024, 1, 1, 1, null, true));
out.add(FederationPoliciesTestUtil.createResourceRequest(3L,
"subcluster0-rack0", 1024, 1, 1, 1, null, true));
out.add(FederationPoliciesTestUtil.createResourceRequest(3L,
ResourceRequest.ANY, 1024, 1, 1, 1, null, true));
// create a request of an unknown node/rack and expect this to show up
// in homesubcluster
out.add(FederationPoliciesTestUtil.createResourceRequest(4L, "unknownNode",
1024, 1, 1, 1, null, false));
out.add(FederationPoliciesTestUtil.createResourceRequest(4L, "unknownRack",
1024, 1, 1, 1, null, false));
out.add(FederationPoliciesTestUtil.createResourceRequest(4L,
ResourceRequest.ANY, 1024, 1, 1, 1, null, false));
return out;
}
}

View File

@ -17,6 +17,9 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
@ -29,12 +32,9 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
/**
* Simple test class for the {@link LoadBasedRouterPolicy}. Test that the
* load is properly considered for allocation.
* Simple test class for the {@link LoadBasedRouterPolicy}. Test that the load
* is properly considered for allocation.
*/
public class TestLoadBasedRouterPolicy extends BaseFederationPoliciesTest {
@ -47,12 +47,10 @@ public class TestLoadBasedRouterPolicy extends BaseFederationPoliciesTest {
// simulate 20 active subclusters
for (int i = 0; i < 20; i++) {
SubClusterIdInfo sc =
new SubClusterIdInfo(String.format("sc%02d", i));
SubClusterIdInfo sc = new SubClusterIdInfo(String.format("sc%02d", i));
SubClusterInfo federationSubClusterInfo =
SubClusterInfo.newInstance(sc.toId(), null, null, null, null, -1,
SubClusterState.SC_RUNNING, -1,
generateClusterMetricsInfo(i));
SubClusterState.SC_RUNNING, -1, generateClusterMetricsInfo(i));
getActiveSubclusters().put(sc.toId(), federationSubClusterInfo);
float weight = getRand().nextInt(2);
if (i == 5) {
@ -76,7 +74,7 @@ public class TestLoadBasedRouterPolicy extends BaseFederationPoliciesTest {
private String generateClusterMetricsInfo(int id) {
long mem = 1024 * getRand().nextInt(277 * 100 - 1);
//plant a best cluster
// plant a best cluster
if (id == 5) {
mem = 1024 * 277 * 100;
}

View File

@ -16,6 +16,12 @@
*/
package org.apache.hadoop.yarn.server.federation.policies.router;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
@ -28,12 +34,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Simple test class for the {@link PriorityRouterPolicy}. Tests that the
* weights are correctly used for ordering the choice of sub-clusters.
@ -72,8 +72,7 @@ public class TestPriorityRouterPolicy extends BaseFederationPoliciesTest {
getPolicyInfo().setRouterPolicyWeights(routerWeights);
getPolicyInfo().setAMRMPolicyWeights(amrmWeights);
FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
getPolicyInfo(),
getActiveSubclusters());
getPolicyInfo(), getActiveSubclusters());
}

View File

@ -17,6 +17,13 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
@ -29,13 +36,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Simple test class for the {@link WeightedRandomRouterPolicy}. Generate large
* number of randomized tests to check we are weighiting correctly even if
@ -71,8 +71,7 @@ public class TestWeightedRandomRouterPolicy extends BaseFederationPoliciesTest {
getPolicyInfo().setAMRMPolicyWeights(amrmWeights);
FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
getPolicyInfo(),
getActiveSubclusters());
getPolicyInfo(), getActiveSubclusters());
}
@ -88,8 +87,8 @@ public class TestWeightedRandomRouterPolicy extends BaseFederationPoliciesTest {
float numberOfDraws = 1000000;
for (float i = 0; i < numberOfDraws; i++) {
SubClusterId chosenId = ((FederationRouterPolicy) getPolicy()).
getHomeSubcluster(getApplicationSubmissionContext());
SubClusterId chosenId = ((FederationRouterPolicy) getPolicy())
.getHomeSubcluster(getApplicationSubmissionContext());
counter.get(chosenId).incrementAndGet();
}
@ -113,13 +112,15 @@ public class TestWeightedRandomRouterPolicy extends BaseFederationPoliciesTest {
if (getActiveSubclusters().containsKey(counterEntry.getKey())) {
Assert.assertTrue(
"Id " + counterEntry.getKey() + " Actual weight: " + actualWeight
+ " expected weight: " + expectedWeight, expectedWeight == 0 ||
(actualWeight / expectedWeight) < 1.1
&& (actualWeight / expectedWeight) > 0.9);
+ " expected weight: " + expectedWeight,
expectedWeight == 0 || (actualWeight / expectedWeight) < 1.1
&& (actualWeight / expectedWeight) > 0.9);
} else {
Assert.assertTrue(
"Id " + counterEntry.getKey() + " Actual weight: " + actualWeight
+ " expected weight: " + expectedWeight, actualWeight == 0);
Assert
.assertTrue(
"Id " + counterEntry.getKey() + " Actual weight: "
+ actualWeight + " expected weight: " + expectedWeight,
actualWeight == 0);
}
}

View File

@ -17,6 +17,7 @@
package org.apache.hadoop.yarn.server.federation.utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy;
@ -26,6 +27,7 @@ import org.apache.hadoop.yarn.server.federation.resolver.DefaultSubClusterResolv
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.*;
import org.apache.hadoop.yarn.util.Records;
import java.net.URL;
import java.nio.ByteBuffer;
@ -48,6 +50,68 @@ public final class FederationPoliciesTestUtil {
// disabled.
}
private static final String FEDR_NODE_PREFIX = "fedr-test-node-";
public static List<ResourceRequest> createResourceRequests(String[] hosts,
int memory, int vCores, int priority, int containers,
String labelExpression, boolean relaxLocality) throws YarnException {
List<ResourceRequest> reqs = new ArrayList<ResourceRequest>();
for (String host : hosts) {
ResourceRequest hostReq =
createResourceRequest(host, memory, vCores, priority, containers,
labelExpression, relaxLocality);
reqs.add(hostReq);
ResourceRequest rackReq =
createResourceRequest("/default-rack", memory, vCores, priority,
containers, labelExpression, relaxLocality);
reqs.add(rackReq);
}
ResourceRequest offRackReq =
createResourceRequest(ResourceRequest.ANY, memory, vCores, priority,
containers, labelExpression, relaxLocality);
reqs.add(offRackReq);
return reqs;
}
protected static ResourceRequest createResourceRequest(String resource,
int memory, int vCores, int priority, int containers,
boolean relaxLocality) throws YarnException {
return createResourceRequest(resource, memory, vCores, priority, containers,
null, relaxLocality);
}
@SuppressWarnings("checkstyle:parameternumber")
public static ResourceRequest createResourceRequest(long id, String resource,
int memory, int vCores, int priority, int containers,
String labelExpression, boolean relaxLocality) throws YarnException {
ResourceRequest out =
createResourceRequest(resource, memory, vCores, priority, containers,
labelExpression, relaxLocality);
out.setAllocationRequestId(id);
return out;
}
public static ResourceRequest createResourceRequest(String resource,
int memory, int vCores, int priority, int containers,
String labelExpression, boolean relaxLocality) throws YarnException {
ResourceRequest req = Records.newRecord(ResourceRequest.class);
req.setResourceName(resource);
req.setNumContainers(containers);
Priority pri = Records.newRecord(Priority.class);
pri.setPriority(priority);
req.setPriority(pri);
Resource capability = Records.newRecord(Resource.class);
capability.setMemorySize(memory);
capability.setVirtualCores(vCores);
req.setCapability(capability);
if (labelExpression != null) {
req.setNodeLabelExpression(labelExpression);
}
req.setRelaxLocality(relaxLocality);
return req;
}
public static void initializePolicyContext(
FederationPolicyInitializationContext fpc, ConfigurableFederationPolicy

View File

@ -2,3 +2,7 @@ node1,subcluster1,rack1
node2 , subcluster2, RACK1
noDE3,subcluster3, rack2
node4, subcluster3, rack2
subcluster0-rack0-host0,subcluster0, subcluster0-rack0
Subcluster1-RACK1-HOST1,subcluster1, subCluster1-RACK1
SUBCLUSTER1-RACK1-HOST2,subcluster1, subCluster1-RACK1
SubCluster2-RACK3-HOST3,subcluster2, subcluster2-rack3