YARN-5324. Stateless Federation router policies implementation. (Carlo Curino via Subru).

(cherry picked from commit 1298127bda)
This commit is contained in:
Subru Krishnan 2016-09-22 17:06:57 -07:00 committed by Carlo Curino
parent 81472778d7
commit 0662996b6a
18 changed files with 1530 additions and 74 deletions

View File

@ -59,13 +59,12 @@ public class FederationPolicyInitializationContext {
/**
* Setter for the {@link SubClusterPolicyConfiguration}.
*
* @param federationPolicyConfiguration the
* {@link SubClusterPolicyConfiguration}
* to be used for initialization.
* @param fedPolicyConfiguration the {@link SubClusterPolicyConfiguration}
* to be used for initialization.
*/
public void setFederationPolicyConfiguration(
SubClusterPolicyConfiguration federationPolicyConfiguration) {
this.federationPolicyConfiguration = federationPolicyConfiguration;
public void setSubClusterPolicyConfiguration(
SubClusterPolicyConfiguration fedPolicyConfiguration) {
this.federationPolicyConfiguration = fedPolicyConfiguration;
}
/**

View File

@ -18,21 +18,29 @@
package org.apache.hadoop.yarn.server.federation.policies;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.router
.FederationRouterPolicy;
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
/**
* Implementors of this interface are capable to instantiate and (re)initalize
* {@link FederationAMRMProxyPolicy} and {@link FederationRouterPolicy} based on
* a {@link FederationPolicyInitializationContext}. The reason to bind these two
* policies together is to make sure we remain consistent across the router and
* amrmproxy policy decisions.
*
* Implementors need to provide the ability to serliaze a policy and its
* configuration as a {@link SubClusterPolicyConfiguration}, as well as
* provide (re)initialization mechanics for the underlying
* {@link FederationAMRMProxyPolicy} and {@link FederationRouterPolicy}.
*
* The serialization aspects are used by admin APIs or a policy engine to
* store a serialized configuration in the {@code FederationStateStore},
* while the getters methods are used to obtain a propertly inizialized
* policy in the {@code Router} and {@code AMRMProxy} respectively.
*
* This interface by design binds together
* {@link FederationAMRMProxyPolicy} and {@link FederationRouterPolicy} and
* provide lifecycle support for serialization and deserialization, to reduce
* configuration mistakes (combining incompatible policies).
*
*/
public interface FederationPolicyConfigurator {
public interface FederationPolicyManager {
/**
* If the current instance is compatible, this method returns the same
@ -88,4 +96,31 @@ public interface FederationPolicyConfigurator {
FederationRouterPolicy oldInstance)
throws FederationPolicyInitializationException;
/**
* This method is invoked to derive a {@link SubClusterPolicyConfiguration}.
* This is to be used when writing a policy object in the federation policy
* store.
*
* @return a valid policy configuration representing this object
* parametrization.
*
* @throws FederationPolicyInitializationException if the current state cannot
* be serialized properly
*/
SubClusterPolicyConfiguration serializeConf()
throws FederationPolicyInitializationException;
/**
* This method returns the queue this policy is configured for.
* @return the name of the queue.
*/
String getQueue();
/**
* This methods provides a setter for the queue this policy is specified for.
* @param queue the name of the queue.
*/
void setQueue(String queue);
}

View File

@ -1,45 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
/**
* Implementors of this class are able to serializeConf the configuraiton of a
* policy as a {@link SubClusterPolicyConfiguration}. This is used during the
* lifetime of a policy from the admin APIs or policy engine to serializeConf
* the policy into the policy store.
*/
public interface FederationPolicyWriter {
/**
/**
* This method is invoked to derive a {@link SubClusterPolicyConfiguration}.
* This is to be used when writing a policy object in the federation policy
* store.
*
* @return a valid policy configuration representing this object
* parametrization.
*
* @throws FederationPolicyInitializationException if the current state cannot
* be serialized properly
*/
SubClusterPolicyConfiguration serializeConf()
throws FederationPolicyInitializationException;
}

View File

@ -0,0 +1,253 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.dao;
import com.sun.jersey.api.json.JSONConfiguration;
import com.sun.jersey.api.json.JSONJAXBContext;
import com.sun.jersey.api.json.JSONMarshaller;
import com.sun.jersey.api.json.JSONUnmarshaller;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import java.io.StringReader;
import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
/**
* This is a DAO class for the configuration of parameteres for federation
* policies. This generalizes several possible configurations as two lists of
* {@link SubClusterIdInfo} and corresponding weights as a
* {@link Float}. The interpretation of the weight is left to the logic in
* the policy.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
@XmlRootElement(name = "federation-policy")
@XmlAccessorType(XmlAccessType.FIELD)
public class WeightedPolicyInfo {
private static final Logger LOG =
LoggerFactory.getLogger(WeightedPolicyInfo.class);
private Map<SubClusterIdInfo, Float> routerPolicyWeights = new HashMap<>();
private Map<SubClusterIdInfo, Float> amrmPolicyWeights = new HashMap<>();
private float headroomAlpha;
private static JSONJAXBContext jsonjaxbContext = initContext();
private static JSONJAXBContext initContext() {
try {
return new JSONJAXBContext(JSONConfiguration.DEFAULT,
WeightedPolicyInfo.class);
} catch (JAXBException e) {
LOG.error("Error parsing the policy.", e);
}
return null;
}
public WeightedPolicyInfo() {
//JAXB needs this
}
/**
* Setter method for Router weights.
*
* @param policyWeights the router weights.
*/
public void setRouterPolicyWeights(
Map<SubClusterIdInfo, Float> policyWeights) {
this.routerPolicyWeights = policyWeights;
}
/**
* Setter method for ARMRMProxy weights.
*
* @param policyWeights the amrmproxy weights.
*/
public void setAMRMPolicyWeights(
Map<SubClusterIdInfo, Float> policyWeights) {
this.amrmPolicyWeights = policyWeights;
}
/**
* Getter of the router weights.
* @return the router weights.
*/
public Map<SubClusterIdInfo, Float> getRouterPolicyWeights() {
return routerPolicyWeights;
}
/**
* Getter for AMRMProxy weights.
* @return the AMRMProxy weights.
*/
public Map<SubClusterIdInfo, Float> getAMRMPolicyWeights() {
return amrmPolicyWeights;
}
/**
* Deserializes a {@link WeightedPolicyInfo} from a byte UTF-8 JSON
* representation.
*
* @param bb the input byte representation.
*
* @return the {@link WeightedPolicyInfo} represented.
*
* @throws FederationPolicyInitializationException if a deserializaiton error
* occurs.
*/
public static WeightedPolicyInfo fromByteBuffer(ByteBuffer bb)
throws FederationPolicyInitializationException {
if (jsonjaxbContext == null) {
throw new FederationPolicyInitializationException("JSONJAXBContext should"
+ " not be null.");
}
try {
JSONUnmarshaller unmarshaller = jsonjaxbContext.createJSONUnmarshaller();
final byte[] bytes = new byte[bb.remaining()];
bb.get(bytes);
String params = new String(bytes, Charset.forName("UTF-8"));
WeightedPolicyInfo weightedPolicyInfo = unmarshaller
.unmarshalFromJSON(new StringReader(params),
WeightedPolicyInfo.class);
return weightedPolicyInfo;
} catch (JAXBException j) {
throw new FederationPolicyInitializationException(j);
}
}
/**
* Converts the policy into a byte array representation in the input {@link
* ByteBuffer}.
*
* @return byte array representation of this policy configuration.
*
* @throws FederationPolicyInitializationException if a serialization error
* occurs.
*/
public ByteBuffer toByteBuffer()
throws FederationPolicyInitializationException {
if (jsonjaxbContext == null) {
throw new FederationPolicyInitializationException("JSONJAXBContext should"
+ " not be null.");
}
try {
String s = toJSONString();
return ByteBuffer.wrap(s.getBytes(Charset.forName("UTF-8")));
} catch (JAXBException j) {
throw new FederationPolicyInitializationException(j);
}
}
private String toJSONString() throws JAXBException {
JSONMarshaller marshaller = jsonjaxbContext.createJSONMarshaller();
marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
StringWriter sw = new StringWriter(256);
marshaller.marshallToJSON(this, sw);
return sw.toString();
}
@Override
public boolean equals(Object other) {
if (other == null || !other.getClass().equals(this.getClass())) {
return false;
}
WeightedPolicyInfo otherPolicy =
(WeightedPolicyInfo) other;
Map<SubClusterIdInfo, Float> otherAMRMWeights =
otherPolicy.getAMRMPolicyWeights();
Map<SubClusterIdInfo, Float> otherRouterWeights =
otherPolicy.getRouterPolicyWeights();
boolean amrmWeightsMatch = otherAMRMWeights != null &&
getAMRMPolicyWeights() != null &&
CollectionUtils.isEqualCollection(otherAMRMWeights.entrySet(),
getAMRMPolicyWeights().entrySet());
boolean routerWeightsMatch = otherRouterWeights != null &&
getRouterPolicyWeights() != null &&
CollectionUtils.isEqualCollection(otherRouterWeights.entrySet(),
getRouterPolicyWeights().entrySet());
return amrmWeightsMatch && routerWeightsMatch;
}
@Override
public int hashCode() {
return 31 * amrmPolicyWeights.hashCode() + routerPolicyWeights.hashCode();
}
/**
* Return the parameter headroomAlpha, used by policies that balance
* weight-based and load-based considerations in their decisions.
*
* For policies that use this parameter, values close to 1 indicate that
* most of the decision should be based on currently observed headroom from
* various sub-clusters, values close to zero, indicate that the decision
* should be mostly based on weights and practically ignore current load.
*
* @return the value of headroomAlpha.
*/
public float getHeadroomAlpha() {
return headroomAlpha;
}
/**
* Set the parameter headroomAlpha, used by policies that balance
* weight-based and load-based considerations in their decisions.
*
* For policies that use this parameter, values close to 1 indicate that
* most of the decision should be based on currently observed headroom from
* various sub-clusters, values close to zero, indicate that the decision
* should be mostly based on weights and practically ignore current load.
*
* @param headroomAlpha the value to use for balancing.
*/
public void setHeadroomAlpha(float headroomAlpha) {
this.headroomAlpha = headroomAlpha;
}
@Override
public String toString() {
try {
return toJSONString();
} catch (JAXBException e) {
e.printStackTrace();
return "Error serializing to string.";
}
}
}

View File

@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/** DAO objects for serializing/deserializing policy configurations. **/
package org.apache.hadoop.yarn.server.federation.policies.dao;

View File

@ -0,0 +1,150 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.router;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import java.util.Map;
/**
* Abstract class provides common validation of reinitialize(), for all
* policies that are "weight-based".
*/
public abstract class BaseWeightedRouterPolicy
implements FederationRouterPolicy {
private WeightedPolicyInfo policyInfo = null;
private FederationPolicyInitializationContext policyContext;
public BaseWeightedRouterPolicy() {
}
@Override
public void reinitialize(FederationPolicyInitializationContext
federationPolicyContext)
throws FederationPolicyInitializationException {
FederationPolicyInitializationContextValidator
.validate(federationPolicyContext, this.getClass().getCanonicalName());
// perform consistency checks
WeightedPolicyInfo newPolicyInfo = WeightedPolicyInfo
.fromByteBuffer(
federationPolicyContext.getSubClusterPolicyConfiguration()
.getParams());
// if nothing has changed skip the rest of initialization
if (policyInfo != null && policyInfo.equals(newPolicyInfo)) {
return;
}
validate(newPolicyInfo);
setPolicyInfo(newPolicyInfo);
this.policyContext = federationPolicyContext;
}
/**
* Overridable validation step for the policy configuration.
* @param newPolicyInfo the configuration to test.
* @throws FederationPolicyInitializationException if the configuration is
* not valid.
*/
public void validate(WeightedPolicyInfo newPolicyInfo) throws
FederationPolicyInitializationException {
if (newPolicyInfo == null) {
throw new FederationPolicyInitializationException("The policy to "
+ "validate should not be null.");
}
Map<SubClusterIdInfo, Float> newWeights =
newPolicyInfo.getRouterPolicyWeights();
if (newWeights == null || newWeights.size() < 1) {
throw new FederationPolicyInitializationException(
"Weight vector cannot be null/empty.");
}
}
/**
* Getter method for the configuration weights.
*
* @return the {@link WeightedPolicyInfo} representing the policy
* configuration.
*/
public WeightedPolicyInfo getPolicyInfo() {
return policyInfo;
}
/**
* Setter method for the configuration weights.
*
* @param policyInfo the {@link WeightedPolicyInfo} representing the policy
* configuration.
*/
public void setPolicyInfo(
WeightedPolicyInfo policyInfo) {
this.policyInfo = policyInfo;
}
/**
* Getter method for the {@link FederationPolicyInitializationContext}.
* @return the context for this policy.
*/
public FederationPolicyInitializationContext getPolicyContext() {
return policyContext;
}
/**
* Setter method for the {@link FederationPolicyInitializationContext}.
* @param policyContext the context to assign to this policy.
*/
public void setPolicyContext(
FederationPolicyInitializationContext policyContext) {
this.policyContext = policyContext;
}
/**
* This methods gets active subclusters map from the {@code
* FederationStateStoreFacade} and validate it not being null/empty.
*
* @return the map of ids to info for all active subclusters.
* @throws YarnException if we can't get the list.
*/
protected Map<SubClusterId, SubClusterInfo> getActiveSubclusters()
throws YarnException {
Map<SubClusterId, SubClusterInfo> activeSubclusters = getPolicyContext()
.getFederationStateStoreFacade().getSubClusters(true);
if (activeSubclusters == null || activeSubclusters.size() < 1) {
throw new NoActiveSubclustersException(
"Zero active subclusters, cannot pick where to send job.");
}
return activeSubclusters;
}
}

View File

@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.router;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import java.util.Map;
/**
* This implements a simple load-balancing policy. The policy "weights" are
* binary 0/1 values that enable/disable each sub-cluster, and the policy peaks
* the sub-cluster with the least load to forward this application.
*/
public class LoadBasedRouterPolicy
extends BaseWeightedRouterPolicy {
private static final Log LOG =
LogFactory.getLog(LoadBasedRouterPolicy.class);
@Override
public void reinitialize(FederationPolicyInitializationContext
federationPolicyContext)
throws FederationPolicyInitializationException {
// remember old policyInfo
WeightedPolicyInfo tempPolicy = getPolicyInfo();
//attempt new initialization
super.reinitialize(federationPolicyContext);
//check extra constraints
for (Float weight : getPolicyInfo().getRouterPolicyWeights().values()) {
if (weight != 0 && weight != 1) {
//reset to old policyInfo if check fails
setPolicyInfo(tempPolicy);
throw new FederationPolicyInitializationException(
this.getClass().getCanonicalName()
+ " policy expects all weights to be either "
+ "\"0\" or \"1\"");
}
}
}
@Override
public SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext)
throws YarnException {
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();
Map<SubClusterIdInfo, Float> weights = getPolicyInfo()
.getRouterPolicyWeights();
SubClusterIdInfo chosen = null;
long currBestMem = -1;
for (Map.Entry<SubClusterId, SubClusterInfo> entry :
activeSubclusters
.entrySet()) {
SubClusterIdInfo id = new SubClusterIdInfo(entry.getKey());
if (weights.containsKey(id) && weights.get(id) > 0) {
long availableMemory = getAvailableMemory(entry.getValue());
if (availableMemory > currBestMem) {
currBestMem = availableMemory;
chosen = id;
}
}
}
return chosen.toId();
}
private long getAvailableMemory(SubClusterInfo value)
throws YarnException {
try {
long mem = -1;
JSONObject obj = new JSONObject(value.getCapability());
mem = obj.getJSONObject("clusterMetrics").getLong("availableMB");
return mem;
} catch (JSONException j) {
throw new YarnException("FederationSubCluserInfo cannot be parsed", j);
}
}
}

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.router;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import java.util.Map;
/**
* This implements a policy that interprets "weights" as a ordered list of
* preferences among sub-clusters. Highest weight among active subclusters is
* chosen.
*/
public class PriorityRouterPolicy
extends BaseWeightedRouterPolicy {
private static final Log LOG =
LogFactory.getLog(PriorityRouterPolicy.class);
@Override
public SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext)
throws YarnException {
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();
// This finds the sub-cluster with the highest weight among the
// currently active ones.
Map<SubClusterIdInfo, Float> weights = getPolicyInfo()
.getRouterPolicyWeights();
SubClusterId chosen = null;
Float currentBest = Float.MIN_VALUE;
for (SubClusterId id : activeSubclusters.keySet()) {
SubClusterIdInfo idInfo = new SubClusterIdInfo(id);
if (weights.containsKey(idInfo) && weights.get(idInfo) > currentBest) {
currentBest = weights.get(idInfo);
chosen = id;
}
}
return chosen;
}
}

View File

@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.router;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
/**
* This simple policy picks at uniform random among any of the currently active
* subclusters. This policy is easy to use and good for testing.
*
* NOTE: this is "almost" subsumed by the {@code WeightedRandomRouterPolicy}.
* Behavior only diverges when there are active sub-clusters that are not part
* of the "weights", in which case the {@link UniformRandomRouterPolicy} send
* load to them, while {@code WeightedRandomRouterPolicy} does not.
*/
public class UniformRandomRouterPolicy extends BaseWeightedRouterPolicy {
private Random rand;
public UniformRandomRouterPolicy() {
rand = new Random(System.currentTimeMillis());
}
@Override
public void reinitialize(
FederationPolicyInitializationContext federationPolicyContext)
throws FederationPolicyInitializationException {
FederationPolicyInitializationContextValidator
.validate(federationPolicyContext, this.getClass().getCanonicalName());
//note: this overrides BaseWeighterRouterPolicy and ignores the weights
setPolicyContext(federationPolicyContext);
}
/**
* Simply picks a random active subcluster to start the AM (this does NOT
* depend on the weights in the policy).
*
* @param appSubmissionContext the context for the app being submitted
* (ignored).
*
* @return a randomly chosen subcluster.
*
* @throws YarnException if there are no active subclusters.
*/
public SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext)
throws YarnException {
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();
List<SubClusterId> list =
new ArrayList<>(activeSubclusters.keySet());
return list.get(rand.nextInt(list.size()));
}
}

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.router;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import java.util.Map;
import java.util.Random;
/**
* This policy implements a weighted random sample among currently active
* sub-clusters.
*/
public class WeightedRandomRouterPolicy
extends BaseWeightedRouterPolicy {
private static final Log LOG =
LogFactory.getLog(WeightedRandomRouterPolicy.class);
private Random rand = new Random(System.currentTimeMillis());
@Override
public SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext)
throws YarnException {
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();
// note: we cannot pre-compute the weights, as the set of activeSubcluster
// changes dynamically (and this would unfairly spread the load to
// sub-clusters adjacent to an inactive one), hence we need to count/scan
// the list and based on weight pick the next sub-cluster.
Map<SubClusterIdInfo, Float> weights = getPolicyInfo()
.getRouterPolicyWeights();
float totActiveWeight = 0;
for(Map.Entry<SubClusterIdInfo, Float> entry : weights.entrySet()){
if(entry.getKey()!=null && activeSubclusters.containsKey(entry.getKey()
.toId())){
totActiveWeight += entry.getValue();
}
}
float lookupValue = rand.nextFloat() * totActiveWeight;
for (SubClusterId id : activeSubclusters.keySet()) {
SubClusterIdInfo idInfo = new SubClusterIdInfo(id);
if (weights.containsKey(idInfo)) {
lookupValue -= weights.get(idInfo);
}
if (lookupValue <= 0) {
return id;
}
}
//should never happen
return null;
}
}

View File

@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.store.records;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
/**
* This class represent a sub-cluster identifier in the JSON representation
* of the policy configuration.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
@XmlRootElement(name = "federation-policy")
@XmlAccessorType(XmlAccessType.FIELD)
public class SubClusterIdInfo {
private String id;
public SubClusterIdInfo() {
//JAXB needs this
}
public SubClusterIdInfo(String subClusterId) {
this.id = subClusterId;
}
public SubClusterIdInfo(SubClusterId subClusterId) {
this.id = subClusterId.getId();
}
/**
* Get the sub-cluster identifier as {@link SubClusterId}.
* @return the sub-cluster id.
*/
public SubClusterId toId() {
return SubClusterId.newInstance(id);
}
@Override
public boolean equals(Object other) {
if (other instanceof SubClusterIdInfo) {
if (((SubClusterIdInfo) other).id.equals(this.id)) {
return true;
}
}
return false;
}
@Override
public int hashCode() {
return id.hashCode();
}
}

View File

@ -0,0 +1,155 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.policies;
import static org.mockito.Mockito.mock;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException;
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
import org.junit.Test;
/**
* Base class for policies tests, tests for common reinitialization cases.
*/
public abstract class BaseFederationPoliciesTest {
private ConfigurableFederationPolicy policy;
private WeightedPolicyInfo policyInfo;
private Map<SubClusterId, SubClusterInfo> activeSubclusters = new HashMap<>();
private FederationPolicyInitializationContext federationPolicyContext;
private ApplicationSubmissionContext applicationSubmissionContext =
mock(ApplicationSubmissionContext.class);
private Random rand = new Random();
@Test
public void testReinitilialize() throws YarnException {
FederationPolicyInitializationContext fpc =
new FederationPolicyInitializationContext();
ByteBuffer buf = getPolicyInfo().toByteBuffer();
fpc.setSubClusterPolicyConfiguration(SubClusterPolicyConfiguration
.newInstance("queue1", getPolicy().getClass().getCanonicalName(), buf));
fpc.setFederationSubclusterResolver(
FederationPoliciesTestUtil.initResolver());
fpc.setFederationStateStoreFacade(FederationPoliciesTestUtil.initFacade());
getPolicy().reinitialize(fpc);
}
@Test(expected = FederationPolicyInitializationException.class)
public void testReinitilializeBad1() throws YarnException {
getPolicy().reinitialize(null);
}
@Test(expected = FederationPolicyInitializationException.class)
public void testReinitilializeBad2() throws YarnException {
FederationPolicyInitializationContext fpc =
new FederationPolicyInitializationContext();
getPolicy().reinitialize(fpc);
}
@Test(expected = FederationPolicyInitializationException.class)
public void testReinitilializeBad3() throws YarnException {
FederationPolicyInitializationContext fpc =
new FederationPolicyInitializationContext();
ByteBuffer buf = mock(ByteBuffer.class);
fpc.setSubClusterPolicyConfiguration(SubClusterPolicyConfiguration
.newInstance("queue1", "WrongPolicyName", buf));
fpc.setFederationSubclusterResolver(
FederationPoliciesTestUtil.initResolver());
fpc.setFederationStateStoreFacade(FederationPoliciesTestUtil.initFacade());
getPolicy().reinitialize(fpc);
}
@Test(expected = NoActiveSubclustersException.class)
public void testNoSubclusters() throws YarnException {
// empty the activeSubclusters map
FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
getPolicyInfo(), new HashMap<>());
ConfigurableFederationPolicy currentPolicy = getPolicy();
if (currentPolicy instanceof FederationRouterPolicy) {
((FederationRouterPolicy) currentPolicy)
.getHomeSubcluster(getApplicationSubmissionContext());
}
}
public ConfigurableFederationPolicy getPolicy() {
return policy;
}
public void setPolicy(ConfigurableFederationPolicy policy) {
this.policy = policy;
}
public WeightedPolicyInfo getPolicyInfo() {
return policyInfo;
}
public void setPolicyInfo(WeightedPolicyInfo policyInfo) {
this.policyInfo = policyInfo;
}
public Map<SubClusterId, SubClusterInfo> getActiveSubclusters() {
return activeSubclusters;
}
public void setActiveSubclusters(
Map<SubClusterId, SubClusterInfo> activeSubclusters) {
this.activeSubclusters = activeSubclusters;
}
public FederationPolicyInitializationContext getFederationPolicyContext() {
return federationPolicyContext;
}
public void setFederationPolicyContext(
FederationPolicyInitializationContext federationPolicyContext) {
this.federationPolicyContext = federationPolicyContext;
}
public ApplicationSubmissionContext getApplicationSubmissionContext() {
return applicationSubmissionContext;
}
public void setApplicationSubmissionContext(
ApplicationSubmissionContext applicationSubmissionContext) {
this.applicationSubmissionContext = applicationSubmissionContext;
}
public Random getRand() {
return rand;
}
public void setRand(Random rand) {
this.rand = rand;
}
}

View File

@ -77,7 +77,7 @@ public class TestFederationPolicyInitializationContextValidator {
@Test(expected = FederationPolicyInitializationException.class)
public void nullConf() throws Exception {
context.setFederationPolicyConfiguration(null);
context.setSubClusterPolicyConfiguration(null);
FederationPolicyInitializationContextValidator.validate(context,
MockPolicyManager.class.getCanonicalName());
}
@ -96,8 +96,8 @@ public class TestFederationPolicyInitializationContextValidator {
MockPolicyManager.class.getCanonicalName());
}
private class MockPolicyManager
implements FederationPolicyWriter, FederationPolicyConfigurator {
private class MockPolicyManager implements FederationPolicyManager {
@Override
public FederationAMRMProxyPolicy getAMRMPolicy(
FederationPolicyInitializationContext
@ -123,6 +123,17 @@ public class TestFederationPolicyInitializationContextValidator {
return SubClusterPolicyConfiguration
.newInstance("queue1", this.getClass().getCanonicalName(), buf);
}
@Override
public String getQueue() {
return "default";
}
@Override
public void setQueue(String queue) {
}
}
}

View File

@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.router;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
/**
* Simple test class for the {@link LoadBasedRouterPolicy}. Test that the
* load is properly considered for allocation.
*/
public class TestLoadBasedRouterPolicy extends BaseFederationPoliciesTest {
@Before
public void setUp() throws Exception {
setPolicy(new LoadBasedRouterPolicy());
setPolicyInfo(new WeightedPolicyInfo());
Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
Map<SubClusterIdInfo, Float> amrmWeights = new HashMap<>();
// simulate 20 active subclusters
for (int i = 0; i < 20; i++) {
SubClusterIdInfo sc =
new SubClusterIdInfo(String.format("sc%02d", i));
SubClusterInfo federationSubClusterInfo =
SubClusterInfo.newInstance(sc.toId(), null, null, null, null, -1,
SubClusterState.SC_RUNNING, -1,
generateClusterMetricsInfo(i));
getActiveSubclusters().put(sc.toId(), federationSubClusterInfo);
float weight = getRand().nextInt(2);
if (i == 5) {
weight = 1.0f;
}
// 5% chance we omit one of the weights
if (i <= 5 || getRand().nextFloat() > 0.05f) {
routerWeights.put(sc, weight);
amrmWeights.put(sc, weight);
}
}
getPolicyInfo().setRouterPolicyWeights(routerWeights);
getPolicyInfo().setAMRMPolicyWeights(amrmWeights);
FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
getPolicyInfo(), getActiveSubclusters());
}
private String generateClusterMetricsInfo(int id) {
long mem = 1024 * getRand().nextInt(277 * 100 - 1);
//plant a best cluster
if (id == 5) {
mem = 1024 * 277 * 100;
}
String clusterMetrics =
"{\"clusterMetrics\":{\"appsSubmitted\":65," + "\"appsCompleted\":64,"
+ "\"appsPending\":0,\"appsRunning\":0,\"appsFailed\":0,"
+ "\"appsKilled\":1,\"reservedMB\":0,\"availableMB\":" + mem + ","
+ "\"allocatedMB\":0,\"reservedVirtualCores\":0,"
+ "\"availableVirtualCores\":2216,\"allocatedVirtualCores\":0,"
+ "\"containersAllocated\":0,\"containersReserved\":0,"
+ "\"containersPending\":0,\"totalMB\":28364800,"
+ "\"totalVirtualCores\":2216,\"totalNodes\":278,\"lostNodes\":1,"
+ "\"unhealthyNodes\":0,\"decommissionedNodes\":0,"
+ "\"rebootedNodes\":0,\"activeNodes\":277}}\n";
return clusterMetrics;
}
@Test
public void testLoadIsRespected() throws YarnException {
SubClusterId chosen = ((FederationRouterPolicy) getPolicy())
.getHomeSubcluster(getApplicationSubmissionContext());
// check the "planted" best cluster is chosen
Assert.assertEquals("sc05", chosen.getId());
}
}

View File

@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.router;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Simple test class for the {@link PriorityRouterPolicy}. Tests that the
* weights are correctly used for ordering the choice of sub-clusters.
*/
public class TestPriorityRouterPolicy extends BaseFederationPoliciesTest {
@Before
public void setUp() throws Exception {
setPolicy(new PriorityRouterPolicy());
setPolicyInfo(new WeightedPolicyInfo());
Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
Map<SubClusterIdInfo, Float> amrmWeights = new HashMap<>();
// simulate 20 subclusters with a 5% chance of being inactive
for (int i = 0; i < 20; i++) {
SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i);
// with 5% omit a subcluster
if (getRand().nextFloat() < 0.95f || i == 5) {
SubClusterInfo sci = mock(SubClusterInfo.class);
when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
when(sci.getSubClusterId()).thenReturn(sc.toId());
getActiveSubclusters().put(sc.toId(), sci);
}
float weight = getRand().nextFloat();
if (i == 5) {
weight = 1.1f; // guaranteed to be the largest.
}
// 5% chance we omit one of the weights
if (i <= 5 || getRand().nextFloat() > 0.05f) {
routerWeights.put(sc, weight);
amrmWeights.put(sc, weight);
}
}
getPolicyInfo().setRouterPolicyWeights(routerWeights);
getPolicyInfo().setAMRMPolicyWeights(amrmWeights);
FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
getPolicyInfo(),
getActiveSubclusters());
}
@Test
public void testPickLowestWeight() throws YarnException {
SubClusterId chosen = ((FederationRouterPolicy) getPolicy())
.getHomeSubcluster(getApplicationSubmissionContext());
Assert.assertEquals("sc5", chosen.getId());
}
}

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.router;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Simple test class for the {@link UniformRandomRouterPolicy}. Tests that one
* of the active subcluster is chosen.
*/
public class TestUniformRandomRouterPolicy extends BaseFederationPoliciesTest {
@Before
public void setUp() throws Exception {
setPolicy(new UniformRandomRouterPolicy());
// needed for base test to work
setPolicyInfo(mock(WeightedPolicyInfo.class));
for (int i = 1; i <= 2; i++) {
SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i);
SubClusterInfo sci = mock(SubClusterInfo.class);
when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
when(sci.getSubClusterId()).thenReturn(sc.toId());
getActiveSubclusters().put(sc.toId(), sci);
}
FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
mock(WeightedPolicyInfo.class), getActiveSubclusters());
}
@Test
public void testOneSubclusterIsChosen() throws YarnException {
SubClusterId chosen = ((FederationRouterPolicy) getPolicy())
.getHomeSubcluster(getApplicationSubmissionContext());
Assert.assertTrue(getActiveSubclusters().keySet().contains(chosen));
}
}

View File

@ -0,0 +1,127 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.router;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Simple test class for the {@link WeightedRandomRouterPolicy}. Generate large
* number of randomized tests to check we are weighiting correctly even if
* clusters go inactive.
*/
public class TestWeightedRandomRouterPolicy extends BaseFederationPoliciesTest {
@Before
public void setUp() throws Exception {
setPolicy(new WeightedRandomRouterPolicy());
setPolicyInfo(new WeightedPolicyInfo());
Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
Map<SubClusterIdInfo, Float> amrmWeights = new HashMap<>();
// simulate 20 subclusters with a 5% chance of being inactive
for (int i = 0; i < 20; i++) {
SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i);
// with 5% omit a subcluster
if (getRand().nextFloat() < 0.95f) {
SubClusterInfo sci = mock(SubClusterInfo.class);
when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
when(sci.getSubClusterId()).thenReturn(sc.toId());
getActiveSubclusters().put(sc.toId(), sci);
}
// 5% chance we omit one of the weights
float weight = getRand().nextFloat();
if (i <= 5 || getRand().nextFloat() > 0.05f) {
routerWeights.put(sc, weight);
amrmWeights.put(sc, weight);
}
}
getPolicyInfo().setRouterPolicyWeights(routerWeights);
getPolicyInfo().setAMRMPolicyWeights(amrmWeights);
FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
getPolicyInfo(),
getActiveSubclusters());
}
@Test
public void testClusterChosenWithRightProbability() throws YarnException {
Map<SubClusterId, AtomicLong> counter = new HashMap<>();
for (SubClusterIdInfo id : getPolicyInfo().getRouterPolicyWeights()
.keySet()) {
counter.put(id.toId(), new AtomicLong(0));
}
float numberOfDraws = 1000000;
for (float i = 0; i < numberOfDraws; i++) {
SubClusterId chosenId = ((FederationRouterPolicy) getPolicy()).
getHomeSubcluster(getApplicationSubmissionContext());
counter.get(chosenId).incrementAndGet();
}
float totalActiveWeight = 0;
for (SubClusterId id : getActiveSubclusters().keySet()) {
SubClusterIdInfo idInfo = new SubClusterIdInfo(id);
if (getPolicyInfo().getRouterPolicyWeights().containsKey(idInfo)) {
totalActiveWeight +=
getPolicyInfo().getRouterPolicyWeights().get(idInfo);
}
}
for (Map.Entry<SubClusterId, AtomicLong> counterEntry : counter
.entrySet()) {
float expectedWeight = getPolicyInfo().getRouterPolicyWeights()
.get(new SubClusterIdInfo(counterEntry.getKey())) / totalActiveWeight;
float actualWeight = counterEntry.getValue().floatValue() / numberOfDraws;
// make sure that the weights is respected among active subclusters
// and no jobs are routed to inactive subclusters.
if (getActiveSubclusters().containsKey(counterEntry.getKey())) {
Assert.assertTrue(
"Id " + counterEntry.getKey() + " Actual weight: " + actualWeight
+ " expected weight: " + expectedWeight, expectedWeight == 0 ||
(actualWeight / expectedWeight) < 1.1
&& (actualWeight / expectedWeight) > 0.9);
} else {
Assert.assertTrue(
"Id " + counterEntry.getKey() + " Actual weight: " + actualWeight
+ " expected weight: " + expectedWeight, actualWeight == 0);
}
}
}
}

View File

@ -19,13 +19,20 @@ package org.apache.hadoop.yarn.server.federation.utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.resolver.DefaultSubClusterResolverImpl;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.*;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
@ -41,6 +48,41 @@ public final class FederationPoliciesTestUtil {
// disabled.
}
public static void initializePolicyContext(
FederationPolicyInitializationContext fpc, ConfigurableFederationPolicy
policy, WeightedPolicyInfo policyInfo,
Map<SubClusterId, SubClusterInfo> activeSubclusters)
throws YarnException {
ByteBuffer buf = policyInfo.toByteBuffer();
fpc.setSubClusterPolicyConfiguration(SubClusterPolicyConfiguration
.newInstance("queue1", policy.getClass().getCanonicalName(), buf));
FederationStateStoreFacade facade = FederationStateStoreFacade
.getInstance();
FederationStateStore fss = mock(FederationStateStore.class);
if (activeSubclusters == null) {
activeSubclusters = new HashMap<SubClusterId, SubClusterInfo>();
}
GetSubClustersInfoResponse response = GetSubClustersInfoResponse
.newInstance(new ArrayList<SubClusterInfo>(activeSubclusters.values()));
when(fss.getSubClusters(any())).thenReturn(response);
facade.reinitialize(fss, new Configuration());
fpc.setFederationStateStoreFacade(facade);
policy.reinitialize(fpc);
}
public static void initializePolicyContext(
ConfigurableFederationPolicy policy,
WeightedPolicyInfo policyInfo, Map<SubClusterId,
SubClusterInfo> activeSubclusters) throws YarnException {
FederationPolicyInitializationContext context =
new FederationPolicyInitializationContext(null, initResolver(),
initFacade());
initializePolicyContext(context, policy, policyInfo, activeSubclusters);
}
/**
* Initialize a {@link SubClusterResolver}.
*
@ -62,6 +104,45 @@ public final class FederationPoliciesTestUtil {
return resolver;
}
/**
* Initialiaze a main-memory {@link FederationStateStoreFacade} used for
* testing, wiht a mock resolver.
*
* @param subClusterInfos the list of subclusters to be served on
* getSubClusters invocations.
*
* @return the facade.
*
* @throws YarnException in case the initialization is not successful.
*/
public static FederationStateStoreFacade initFacade(
List<SubClusterInfo> subClusterInfos, SubClusterPolicyConfiguration
policyConfiguration) throws YarnException {
FederationStateStoreFacade goodFacade = FederationStateStoreFacade
.getInstance();
FederationStateStore fss = mock(FederationStateStore.class);
GetSubClustersInfoResponse response = GetSubClustersInfoResponse
.newInstance(subClusterInfos);
when(fss.getSubClusters(any())).thenReturn(response);
List<SubClusterPolicyConfiguration> configurations = new ArrayList<>();
configurations.add(policyConfiguration);
GetSubClusterPoliciesConfigurationsResponse policiesResponse =
GetSubClusterPoliciesConfigurationsResponse
.newInstance(configurations);
when(fss.getPoliciesConfigurations(any())).thenReturn(policiesResponse);
GetSubClusterPolicyConfigurationResponse policyResponse =
GetSubClusterPolicyConfigurationResponse
.newInstance(policyConfiguration);
when(fss.getPolicyConfiguration(any())).thenReturn(policyResponse);
goodFacade.reinitialize(fss, new Configuration());
return goodFacade;
}
/**
* Initialiaze a main-memory {@link FederationStateStoreFacade} used for
* testing, wiht a mock resolver.
@ -71,13 +152,8 @@ public final class FederationPoliciesTestUtil {
* @throws YarnException in case the initialization is not successful.
*/
public static FederationStateStoreFacade initFacade() throws YarnException {
FederationStateStoreFacade goodFacade = FederationStateStoreFacade
.getInstance();
FederationStateStore fss = mock(FederationStateStore.class);
GetSubClustersInfoResponse response = GetSubClustersInfoResponse
.newInstance(new ArrayList<>());
when(fss.getSubClusters(any())).thenReturn(response);
goodFacade.reinitialize(fss, new Configuration());
return goodFacade;
return initFacade(new ArrayList<>(), mock(SubClusterPolicyConfiguration
.class));
}
}