diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseWeightedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractConfigurableFederationPolicy.java similarity index 67% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseWeightedRouterPolicy.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractConfigurableFederationPolicy.java index e888979e088..4cb9bbe5e49 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseWeightedRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractConfigurableFederationPolicy.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,82 +16,87 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.server.federation.policies.router; +package org.apache.hadoop.yarn.server.federation.policies; + +import java.util.Map; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; -import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; -import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; -import java.util.Map; - /** - * Abstract class provides common validation of reinitialize(), for all - * policies that are "weight-based". + * Base abstract class for a weighted {@link ConfigurableFederationPolicy}. */ -public abstract class BaseWeightedRouterPolicy - implements FederationRouterPolicy { +public abstract class AbstractConfigurableFederationPolicy + implements ConfigurableFederationPolicy { private WeightedPolicyInfo policyInfo = null; private FederationPolicyInitializationContext policyContext; + private boolean isDirty; - public BaseWeightedRouterPolicy() { + public AbstractConfigurableFederationPolicy() { } @Override - public void reinitialize(FederationPolicyInitializationContext - federationPolicyContext) + public void reinitialize( + FederationPolicyInitializationContext initializationContext) throws FederationPolicyInitializationException { + isDirty = true; FederationPolicyInitializationContextValidator - .validate(federationPolicyContext, this.getClass().getCanonicalName()); + .validate(initializationContext, this.getClass().getCanonicalName()); // perform consistency checks - WeightedPolicyInfo newPolicyInfo = WeightedPolicyInfo - .fromByteBuffer( - federationPolicyContext.getSubClusterPolicyConfiguration() - .getParams()); + WeightedPolicyInfo newPolicyInfo = WeightedPolicyInfo.fromByteBuffer( + initializationContext.getSubClusterPolicyConfiguration().getParams()); // if nothing has changed skip the rest of initialization + // and signal to childs that the reinit is free via isDirty var. if (policyInfo != null && policyInfo.equals(newPolicyInfo)) { + isDirty = false; return; } validate(newPolicyInfo); setPolicyInfo(newPolicyInfo); - this.policyContext = federationPolicyContext; + this.policyContext = initializationContext; } /** * Overridable validation step for the policy configuration. + * * @param newPolicyInfo the configuration to test. - * @throws FederationPolicyInitializationException if the configuration is - * not valid. + * + * @throws FederationPolicyInitializationException if the configuration is not + * valid. */ - public void validate(WeightedPolicyInfo newPolicyInfo) throws - FederationPolicyInitializationException { + public void validate(WeightedPolicyInfo newPolicyInfo) + throws FederationPolicyInitializationException { if (newPolicyInfo == null) { - throw new FederationPolicyInitializationException("The policy to " - + "validate should not be null."); - } - Map newWeights = - newPolicyInfo.getRouterPolicyWeights(); - if (newWeights == null || newWeights.size() < 1) { throw new FederationPolicyInitializationException( - "Weight vector cannot be null/empty."); + "The policy to " + "validate should not be null."); } } + /** + * Returns true whether the last reinitialization requires actual changes, or + * was "free" as the weights have not changed. This is used by subclasses + * overriding reinitialize and calling super.reinitialize() to know wheter to + * quit early. + * + * @return whether more work is needed to initialize. + */ + public boolean getIsDirty() { + return isDirty; + } /** * Getter method for the configuration weights. * * @return the {@link WeightedPolicyInfo} representing the policy - * configuration. + * configuration. */ public WeightedPolicyInfo getPolicyInfo() { return policyInfo; @@ -101,15 +106,15 @@ public WeightedPolicyInfo getPolicyInfo() { * Setter method for the configuration weights. * * @param policyInfo the {@link WeightedPolicyInfo} representing the policy - * configuration. + * configuration. */ - public void setPolicyInfo( - WeightedPolicyInfo policyInfo) { + public void setPolicyInfo(WeightedPolicyInfo policyInfo) { this.policyInfo = policyInfo; } /** * Getter method for the {@link FederationPolicyInitializationContext}. + * * @return the context for this policy. */ public FederationPolicyInitializationContext getPolicyContext() { @@ -118,6 +123,7 @@ public FederationPolicyInitializationContext getPolicyContext() { /** * Setter method for the {@link FederationPolicyInitializationContext}. + * * @param policyContext the context to assign to this policy. */ public void setPolicyContext( @@ -130,13 +136,14 @@ public void setPolicyContext( * FederationStateStoreFacade} and validate it not being null/empty. * * @return the map of ids to info for all active subclusters. + * * @throws YarnException if we can't get the list. */ protected Map getActiveSubclusters() throws YarnException { - Map activeSubclusters = getPolicyContext() - .getFederationStateStoreFacade().getSubClusters(true); + Map activeSubclusters = + getPolicyContext().getFederationStateStoreFacade().getSubClusters(true); if (activeSubclusters == null || activeSubclusters.size() < 1) { throw new NoActiveSubclustersException( @@ -145,6 +152,4 @@ protected Map getActiveSubclusters() return activeSubclusters; } - - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/ConfigurableFederationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/ConfigurableFederationPolicy.java index fd6ceeab755..524577205a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/ConfigurableFederationPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/ConfigurableFederationPolicy.java @@ -31,14 +31,11 @@ public interface ConfigurableFederationPolicy { * policies. The implementor should provide try-n-swap semantics, and retain * state if possible. * - * @param federationPolicyInitializationContext the new context to provide to - * implementor. + * @param policyContext the new context to provide to implementor. * * @throws FederationPolicyInitializationException in case the initialization - * fails. + * fails. */ - void reinitialize( - FederationPolicyInitializationContext - federationPolicyInitializationContext) + void reinitialize(FederationPolicyInitializationContext policyContext) throws FederationPolicyInitializationException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java index 9347fd0a0dc..46dd6eb5599 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.federation.policies; import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; @@ -30,6 +31,7 @@ public class FederationPolicyInitializationContext { private SubClusterPolicyConfiguration federationPolicyConfiguration; private SubClusterResolver federationSubclusterResolver; private FederationStateStoreFacade federationStateStoreFacade; + private SubClusterId homeSubcluster; public FederationPolicyInitializationContext() { federationPolicyConfiguration = null; @@ -37,20 +39,19 @@ public FederationPolicyInitializationContext() { federationStateStoreFacade = null; } - public FederationPolicyInitializationContext(SubClusterPolicyConfiguration - policy, SubClusterResolver resolver, FederationStateStoreFacade - storeFacade) { + public FederationPolicyInitializationContext( + SubClusterPolicyConfiguration policy, SubClusterResolver resolver, + FederationStateStoreFacade storeFacade) { this.federationPolicyConfiguration = policy; this.federationSubclusterResolver = resolver; this.federationStateStoreFacade = storeFacade; } - /** * Getter for the {@link SubClusterPolicyConfiguration}. * * @return the {@link SubClusterPolicyConfiguration} to be used for - * initialization. + * initialization. */ public SubClusterPolicyConfiguration getSubClusterPolicyConfiguration() { return federationPolicyConfiguration; @@ -59,8 +60,8 @@ public SubClusterPolicyConfiguration getSubClusterPolicyConfiguration() { /** * Setter for the {@link SubClusterPolicyConfiguration}. * - * @param fedPolicyConfiguration the {@link SubClusterPolicyConfiguration} - * to be used for initialization. + * @param fedPolicyConfiguration the {@link SubClusterPolicyConfiguration} to + * be used for initialization. */ public void setSubClusterPolicyConfiguration( SubClusterPolicyConfiguration fedPolicyConfiguration) { @@ -80,7 +81,7 @@ public SubClusterResolver getFederationSubclusterResolver() { * Setter for the {@link SubClusterResolver}. * * @param federationSubclusterResolver the {@link SubClusterResolver} to be - * used for initialization. + * used for initialization. */ public void setFederationSubclusterResolver( SubClusterResolver federationSubclusterResolver) { @@ -105,4 +106,24 @@ public void setFederationStateStoreFacade( FederationStateStoreFacade federationStateStoreFacade) { this.federationStateStoreFacade = federationStateStoreFacade; } + + /** + * Returns the current home sub-cluster. Useful for default policy behaviors. + * + * @return the home sub-cluster. + */ + public SubClusterId getHomeSubcluster() { + return homeSubcluster; + } + + /** + * Sets in the context the home sub-cluster. Useful for default policy + * behaviors. + * + * @param homeSubcluster value to set. + */ + public void setHomeSubcluster(SubClusterId homeSubcluster) { + this.homeSubcluster = homeSubcluster; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java index 31f83d4afaf..1b83bbc8602 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java @@ -25,50 +25,44 @@ public final class FederationPolicyInitializationContextValidator { private FederationPolicyInitializationContextValidator() { - //disable constructor per checkstyle + // disable constructor per checkstyle } public static void validate( - FederationPolicyInitializationContext - federationPolicyInitializationContext, - String myType) throws FederationPolicyInitializationException { + FederationPolicyInitializationContext policyContext, String myType) + throws FederationPolicyInitializationException { if (myType == null) { - throw new FederationPolicyInitializationException("The myType parameter" - + " should not be null."); + throw new FederationPolicyInitializationException( + "The myType parameter" + " should not be null."); } - if (federationPolicyInitializationContext == null) { + if (policyContext == null) { throw new FederationPolicyInitializationException( "The FederationPolicyInitializationContext provided is null. Cannot" - + " reinitalize " - + "successfully."); + + " reinitalize " + "successfully."); } - if (federationPolicyInitializationContext.getFederationStateStoreFacade() - == null) { + if (policyContext.getFederationStateStoreFacade() == null) { throw new FederationPolicyInitializationException( "The FederationStateStoreFacade provided is null. Cannot" + " reinitalize successfully."); } - if (federationPolicyInitializationContext.getFederationSubclusterResolver() - == null) { + if (policyContext.getFederationSubclusterResolver() == null) { throw new FederationPolicyInitializationException( "The FederationStateStoreFacase provided is null. Cannot" + " reinitalize successfully."); } - if (federationPolicyInitializationContext.getSubClusterPolicyConfiguration() - == null) { + if (policyContext.getSubClusterPolicyConfiguration() == null) { throw new FederationPolicyInitializationException( "The FederationSubclusterResolver provided is null. Cannot " + "reinitalize successfully."); } String intendedType = - federationPolicyInitializationContext.getSubClusterPolicyConfiguration() - .getType(); + policyContext.getSubClusterPolicyConfiguration().getType(); if (!myType.equals(intendedType)) { throw new FederationPolicyInitializationException( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java index e5dba638f87..39fdba33b9c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java @@ -25,19 +25,19 @@ /** * * Implementors need to provide the ability to serliaze a policy and its - * configuration as a {@link SubClusterPolicyConfiguration}, as well as - * provide (re)initialization mechanics for the underlying + * configuration as a {@link SubClusterPolicyConfiguration}, as well as provide + * (re)initialization mechanics for the underlying * {@link FederationAMRMProxyPolicy} and {@link FederationRouterPolicy}. * - * The serialization aspects are used by admin APIs or a policy engine to - * store a serialized configuration in the {@code FederationStateStore}, - * while the getters methods are used to obtain a propertly inizialized - * policy in the {@code Router} and {@code AMRMProxy} respectively. + * The serialization aspects are used by admin APIs or a policy engine to store + * a serialized configuration in the {@code FederationStateStore}, while the + * getters methods are used to obtain a propertly inizialized policy in the + * {@code Router} and {@code AMRMProxy} respectively. * - * This interface by design binds together - * {@link FederationAMRMProxyPolicy} and {@link FederationRouterPolicy} and - * provide lifecycle support for serialization and deserialization, to reduce - * configuration mistakes (combining incompatible policies). + * This interface by design binds together {@link FederationAMRMProxyPolicy} and + * {@link FederationRouterPolicy} and provide lifecycle support for + * serialization and deserialization, to reduce configuration mistakes + * (combining incompatible policies). * */ public interface FederationPolicyManager { @@ -50,23 +50,17 @@ public interface FederationPolicyManager { * the implementors should attempt to reinitalize (retaining state). To affect * a complete policy reset oldInstance should be null. * - * @param federationPolicyInitializationContext the current context - * @param oldInstance the existing (possibly null) - * instance. + * @param policyContext the current context + * @param oldInstance the existing (possibly null) instance. * - * @return an updated {@link FederationAMRMProxyPolicy - }. + * @return an updated {@link FederationAMRMProxyPolicy }. * * @throws FederationPolicyInitializationException if the initialization - * cannot be completed - * properly. The oldInstance - * should be still valid in - * case of failed - * initialization. + * cannot be completed properly. The oldInstance should be still + * valid in case of failed initialization. */ FederationAMRMProxyPolicy getAMRMPolicy( - FederationPolicyInitializationContext - federationPolicyInitializationContext, + FederationPolicyInitializationContext policyContext, FederationAMRMProxyPolicy oldInstance) throws FederationPolicyInitializationException; @@ -78,21 +72,17 @@ FederationAMRMProxyPolicy getAMRMPolicy( * implementors should attempt to reinitalize (retaining state). To affect a * complete policy reset oldInstance shoulb be set to null. * - * @param federationPolicyInitializationContext the current context - * @param oldInstance the existing (possibly null) - * instance. + * @param policyContext the current context + * @param oldInstance the existing (possibly null) instance. * * @return an updated {@link FederationRouterPolicy}. * * @throws FederationPolicyInitializationException if the initalization cannot - * be completed properly. The - * oldInstance should be still - * valid in case of failed - * initialization. + * be completed properly. The oldInstance should be still valid in + * case of failed initialization. */ FederationRouterPolicy getRouterPolicy( - FederationPolicyInitializationContext - federationPolicyInitializationContext, + FederationPolicyInitializationContext policyContext, FederationRouterPolicy oldInstance) throws FederationPolicyInitializationException; @@ -102,23 +92,24 @@ FederationRouterPolicy getRouterPolicy( * store. * * @return a valid policy configuration representing this object - * parametrization. + * parametrization. * * @throws FederationPolicyInitializationException if the current state cannot - * be serialized properly + * be serialized properly */ SubClusterPolicyConfiguration serializeConf() throws FederationPolicyInitializationException; - /** * This method returns the queue this policy is configured for. + * * @return the name of the queue. */ String getQueue(); /** * This methods provides a setter for the queue this policy is specified for. + * * @param queue the name of the queue. */ void setQueue(String queue); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/AbstractAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/AbstractAMRMProxyPolicy.java new file mode 100644 index 00000000000..e853744e106 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/AbstractAMRMProxyPolicy.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; + +import java.util.Map; + +import org.apache.hadoop.yarn.server.federation.policies.AbstractConfigurableFederationPolicy; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; + +/** + * Base abstract class for {@link FederationAMRMProxyPolicy} implementations, + * that provides common validation for reinitialization. + */ +public abstract class AbstractAMRMProxyPolicy extends + AbstractConfigurableFederationPolicy implements FederationAMRMProxyPolicy { + + @Override + public void validate(WeightedPolicyInfo newPolicyInfo) + throws FederationPolicyInitializationException { + super.validate(newPolicyInfo); + Map newWeights = + newPolicyInfo.getAMRMPolicyWeights(); + if (newWeights == null || newWeights.size() < 1) { + throw new FederationPolicyInitializationException( + "Weight vector cannot be null/empty."); + } + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java new file mode 100644 index 00000000000..679f4d5fa41 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.UnknownSubclusterException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; + +/** + * An implementation of the {@link FederationAMRMProxyPolicy} that simply + * broadcasts each {@link ResourceRequest} to all the available sub-clusters. + */ +public class BroadcastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { + + private Set knownClusterIds = new HashSet<>(); + + @Override + public void reinitialize( + FederationPolicyInitializationContext policyContext) + throws FederationPolicyInitializationException { + // overrides initialize to avoid weight checks that do no apply for + // this policy. + FederationPolicyInitializationContextValidator + .validate(policyContext, this.getClass().getCanonicalName()); + setPolicyContext(policyContext); + } + + @Override + public Map> splitResourceRequests( + List resourceRequests) throws YarnException { + + Map activeSubclusters = + getActiveSubclusters(); + + Map> answer = new HashMap<>(); + + // simply broadcast the resource request to all sub-clusters + for (SubClusterId subClusterId : activeSubclusters.keySet()) { + answer.put(subClusterId, resourceRequests); + knownClusterIds.add(subClusterId); + } + + return answer; + } + + @Override + public void notifyOfResponse(SubClusterId subClusterId, + AllocateResponse response) throws YarnException { + if (!knownClusterIds.contains(subClusterId)) { + throw new UnknownSubclusterException( + "The response is received from a subcluster that is unknown to this " + + "policy."); + } + // stateless policy does not care about responses + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java index 4a3305c43c2..0541df4346c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java @@ -17,18 +17,18 @@ package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; +import java.util.List; +import java.util.Map; + import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; -import java.util.List; -import java.util.Map; - /** - * Implementors of this interface provide logic to split the list of {@link - * ResourceRequest}s received by the AM among various RMs. + * Implementors of this interface provide logic to split the list of + * {@link ResourceRequest}s received by the AM among various RMs. */ public interface FederationAMRMProxyPolicy extends ConfigurableFederationPolicy { @@ -37,18 +37,17 @@ public interface FederationAMRMProxyPolicy * Splits the {@link ResourceRequest}s from the client across one or more * sub-clusters based on the policy semantics (e.g., broadcast, load-based). * - * @param resourceRequests the list of {@link ResourceRequest}s from the - * AM to be split + * @param resourceRequests the list of {@link ResourceRequest}s from the AM to + * be split * * @return map of sub-cluster as identified by {@link SubClusterId} to the - * list of {@link ResourceRequest}s that should be forwarded to it + * list of {@link ResourceRequest}s that should be forwarded to it * * @throws YarnException in case the request is malformed or no viable - * sub-clusters can be found. + * sub-clusters can be found. */ Map> splitResourceRequests( - List resourceRequests) - throws YarnException; + List resourceRequests) throws YarnException; /** * This method should be invoked to notify the policy about responses being @@ -60,7 +59,7 @@ Map> splitResourceRequests( * * @throws YarnException in case the response is not valid */ - void notifyOfResponse(SubClusterId subClusterId, - AllocateResponse response) throws YarnException; + void notifyOfResponse(SubClusterId subClusterId, AllocateResponse response) + throws YarnException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java new file mode 100644 index 00000000000..283f89e98ba --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java @@ -0,0 +1,583 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException; +import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * An implementation of the {@link FederationAMRMProxyPolicy} interface that + * carefully multicasts the requests with the following behavior: + * + *

+ * Host localized {@link ResourceRequest}s are always forwarded to the RM that + * owns the corresponding node, based on the feedback of a + * {@link SubClusterResolver}. If the {@link SubClusterResolver} cannot resolve + * this node we default to forwarding the {@link ResourceRequest} to the home + * sub-cluster. + *

+ * + *

+ * Rack localized {@link ResourceRequest}s are forwarded to the RMs that owns + * the corresponding rack. Note that in some deployments each rack could be + * striped across multiple RMs. Thsi policy respects that. If the + * {@link SubClusterResolver} cannot resolve this rack we default to forwarding + * the {@link ResourceRequest} to the home sub-cluster. + *

+ * + *

+ * ANY requests corresponding to node/rack local requests are forwarded only to + * the set of RMs that owns the corresponding localized requests. The number of + * containers listed in each ANY is proportional to the number of localized + * container requests (associated to this ANY via the same allocateRequestId). + *

+ * + *

+ * ANY that are not associated to node/rack local requests are split among RMs + * based on the "weights" in the {@link WeightedPolicyInfo} configuration *and* + * headroom information. The {@code headroomAlpha} parameter of the policy + * configuration indicates how much headroom contributes to the splitting + * choice. Value of 1.0f indicates the weights are interpreted only as 0/1 + * boolean but all splitting is based on the advertised headroom (fallback to + * 1/N for RMs that we don't have headroom info from). An {@code headroomAlpha} + * value of 0.0f means headroom is ignored and all splitting decisions are + * proportional to the "weights" in the configuration of the policy. + *

+ * + *

+ * ANY of zero size are forwarded to all known subclusters (i.e., subclusters + * where we scheduled containers before), as they may represent a user attempt + * to cancel a previous request (and we are mostly stateless now, so should + * forward to all known RMs). + *

+ * + *

+ * Invariants: + *

+ * + *

+ * The policy always excludes non-active RMs. + *

+ * + *

+ * The policy always excludes RMs that do not appear in the policy configuration + * weights, or have a weight of 0 (even if localized resources explicit refer to + * it). + *

+ * + *

+ * (Bar rounding to closest ceiling of fractional containers) The sum of + * requests made to multiple RMs at the ANY level "adds-up" to the user request. + * The maximum possible excess in a given request is a number of containers less + * or equal to number of sub-clusters in the federation. + *

+ */ +public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { + + public static final Logger LOG = + LoggerFactory.getLogger(LocalityMulticastAMRMProxyPolicy.class); + + private Map weights; + private SubClusterResolver resolver; + + private Map headroom; + private float hrAlpha; + private FederationStateStoreFacade federationFacade; + private AllocationBookkeeper bookkeeper; + private SubClusterId homeSubcluster; + + @Override + public void reinitialize( + FederationPolicyInitializationContext policyContext) + throws FederationPolicyInitializationException { + + // save reference to old weights + WeightedPolicyInfo tempPolicy = getPolicyInfo(); + + super.reinitialize(policyContext); + if (!getIsDirty()) { + return; + } + + Map newWeightsConverted = new HashMap<>(); + boolean allInactive = true; + WeightedPolicyInfo policy = getPolicyInfo(); + if (policy.getAMRMPolicyWeights() == null + || policy.getAMRMPolicyWeights().size() == 0) { + allInactive = false; + } else { + for (Map.Entry e : policy.getAMRMPolicyWeights() + .entrySet()) { + if (e.getValue() > 0) { + allInactive = false; + } + newWeightsConverted.put(e.getKey().toId(), e.getValue()); + } + } + if (allInactive) { + // reset the policyInfo and throw + setPolicyInfo(tempPolicy); + throw new FederationPolicyInitializationException( + "The weights used to configure " + + "this policy are all set to zero! (no ResourceRequest could be " + + "forwarded with this setting.)"); + } + + if (policyContext.getHomeSubcluster() == null) { + setPolicyInfo(tempPolicy); + throw new FederationPolicyInitializationException("The homeSubcluster " + + "filed in the context must be initialized to use this policy"); + } + + weights = newWeightsConverted; + resolver = policyContext.getFederationSubclusterResolver(); + + if (headroom == null) { + headroom = new ConcurrentHashMap<>(); + } + hrAlpha = policy.getHeadroomAlpha(); + + this.federationFacade = + policyContext.getFederationStateStoreFacade(); + this.bookkeeper = new AllocationBookkeeper(); + this.homeSubcluster = policyContext.getHomeSubcluster(); + + } + + @Override + public void notifyOfResponse(SubClusterId subClusterId, + AllocateResponse response) throws YarnException { + // stateless policy does not care about responses except tracking headroom + headroom.put(subClusterId, response.getAvailableResources()); + } + + @Override + public Map> splitResourceRequests( + List resourceRequests) throws YarnException { + + // object used to accumulate statistics about the answer, initialize with + // active subclusters. + bookkeeper.reinitialize(federationFacade.getSubClusters(true)); + + List nonLocalizedRequests = + new ArrayList(); + + SubClusterId targetId = null; + Set targetIds = null; + + // if the RR is resolved to a local subcluster add it directly (node and + // resolvable racks) + for (ResourceRequest rr : resourceRequests) { + targetId = null; + targetIds = null; + + // Handle: ANY (accumulated for later) + if (ResourceRequest.isAnyLocation(rr.getResourceName())) { + nonLocalizedRequests.add(rr); + continue; + } + + // Handle "node" requests + try { + targetId = resolver.getSubClusterForNode(rr.getResourceName()); + } catch (YarnException e) { + // this might happen as we can't differentiate node from rack names + // we log altogether later + } + if (bookkeeper.isActiveAndEnabled(targetId)) { + bookkeeper.addLocalizedNodeRR(targetId, rr); + continue; + } + + // Handle "rack" requests + try { + targetIds = resolver.getSubClustersForRack(rr.getResourceName()); + } catch (YarnException e) { + // this might happen as we can't differentiate node from rack names + // we log altogether later + } + if (targetIds != null && targetIds.size() > 0) { + for (SubClusterId tid : targetIds) { + if (bookkeeper.isActiveAndEnabled(tid)) { + bookkeeper.addRackRR(tid, rr); + } + } + continue; + } + + // Handle node/rack requests that the SubClusterResolver cannot map to + // any cluster. Defaulting to home subcluster. + if (LOG.isDebugEnabled()) { + LOG.debug("ERROR resolving sub-cluster for resourceName: " + + rr.getResourceName() + " we are falling back to homeSubCluster:" + + homeSubcluster); + } + + // If home-subcluster is not active, ignore node/rack request + if (bookkeeper.isActiveAndEnabled(homeSubcluster)) { + bookkeeper.addLocalizedNodeRR(homeSubcluster, rr); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("The homeSubCluster (" + homeSubcluster + ") we are " + + "defaulting to is not active, the ResourceRequest " + + "will be ignored."); + } + } + } + + // handle all non-localized requests (ANY) + splitAnyRequests(nonLocalizedRequests, bookkeeper); + + return bookkeeper.getAnswer(); + } + + /** + * It splits a list of non-localized resource requests among sub-clusters. + */ + private void splitAnyRequests(List originalResourceRequests, + AllocationBookkeeper allocationBookkeeper) throws YarnException { + + for (ResourceRequest resourceRequest : originalResourceRequests) { + + // FIRST: pick the target set of subclusters (based on whether this RR + // is associated with other localized requests via an allocationId) + Long allocationId = resourceRequest.getAllocationRequestId(); + Set targetSubclusters; + if (allocationBookkeeper.getSubClustersForId(allocationId) != null) { + targetSubclusters = + allocationBookkeeper.getSubClustersForId(allocationId); + } else { + targetSubclusters = allocationBookkeeper.getActiveAndEnabledSC(); + } + + // SECOND: pick how much to ask to each RM for each request + splitIndividualAny(resourceRequest, targetSubclusters, + allocationBookkeeper); + } + } + + /** + * Return a projection of this ANY {@link ResourceRequest} that belongs to + * this sub-cluster. This is done based on the "count" of the containers that + * require locality in each sublcuster (if any) or based on the "weights" and + * headroom. + */ + private void splitIndividualAny(ResourceRequest originalResourceRequest, + Set targetSubclusters, + AllocationBookkeeper allocationBookkeeper) { + + long allocationId = originalResourceRequest.getAllocationRequestId(); + + for (SubClusterId targetId : targetSubclusters) { + float numContainer = originalResourceRequest.getNumContainers(); + + // If the ANY request has 0 containers to begin with we must forward it to + // any RM we have previously contacted (this might be the user way + // to cancel a previous request). + if (numContainer == 0 && headroom.containsKey(targetId)) { + allocationBookkeeper.addAnyRR(targetId, originalResourceRequest); + } + + // If ANY is associated with localized asks, split based on their ratio + if (allocationBookkeeper.getSubClustersForId(allocationId) != null) { + float localityBasedWeight = getLocalityBasedWeighting(allocationId, + targetId, allocationBookkeeper); + numContainer = numContainer * localityBasedWeight; + } else { + // split ANY based on load and policy configuration + float headroomWeighting = + getHeadroomWeighting(targetId, allocationBookkeeper); + float policyWeighting = + getPolicyConfigWeighting(targetId, allocationBookkeeper); + // hrAlpha controls how much headroom influencing decision + numContainer = numContainer + * (hrAlpha * headroomWeighting + (1 - hrAlpha) * policyWeighting); + } + + // if the calculated request is non-empty add it to the answer + if (numContainer > 0) { + ResourceRequest out = + ResourceRequest.newInstance(originalResourceRequest.getPriority(), + originalResourceRequest.getResourceName(), + originalResourceRequest.getCapability(), + originalResourceRequest.getNumContainers(), + originalResourceRequest.getRelaxLocality(), + originalResourceRequest.getNodeLabelExpression(), + originalResourceRequest.getExecutionTypeRequest()); + out.setAllocationRequestId(allocationId); + out.setNumContainers((int) Math.ceil(numContainer)); + if (out.isAnyLocation(out.getResourceName())) { + allocationBookkeeper.addAnyRR(targetId, out); + } else { + allocationBookkeeper.addRackRR(targetId, out); + } + } + } + } + + /** + * Compute the weight to assign to a subcluster based on how many local + * requests a subcluster is target of. + */ + private float getLocalityBasedWeighting(long reqId, SubClusterId targetId, + AllocationBookkeeper allocationBookkeeper) { + float totWeight = allocationBookkeeper.getTotNumLocalizedContainers(); + float localWeight = + allocationBookkeeper.getNumLocalizedContainers(reqId, targetId); + return totWeight > 0 ? localWeight / totWeight : 0; + } + + /** + * Compute the "weighting" to give to a sublcuster based on the configured + * policy weights (for the active subclusters). + */ + private float getPolicyConfigWeighting(SubClusterId targetId, + AllocationBookkeeper allocationBookkeeper) { + float totWeight = allocationBookkeeper.totPolicyWeight; + Float localWeight = weights.get(targetId); + return (localWeight != null && totWeight > 0) ? localWeight / totWeight : 0; + } + + /** + * Compute the weighting based on available headroom. This is proportional to + * the available headroom memory announced by RM, or to 1/N for RMs we have + * not seen yet. If all RMs report zero headroom, we fallback to 1/N again. + */ + private float getHeadroomWeighting(SubClusterId targetId, + AllocationBookkeeper allocationBookkeeper) { + + // baseline weight for all RMs + float headroomWeighting = + 1 / (float) allocationBookkeeper.getActiveAndEnabledSC().size(); + + // if we have headroom infomration for this sub-cluster (and we are safe + // from /0 issues) + if (headroom.containsKey(targetId) + && allocationBookkeeper.totHeadroomMemory > 0) { + // compute which portion of the RMs that are active/enabled have reported + // their headroom (needed as adjustment factor) + // (note: getActiveAndEnabledSC should never be null/zero) + float ratioHeadroomKnown = allocationBookkeeper.totHeadRoomEnabledRMs + / (float) allocationBookkeeper.getActiveAndEnabledSC().size(); + + // headroomWeighting is the ratio of headroom memory in the targetId + // cluster / total memory. The ratioHeadroomKnown factor is applied to + // adjust for missing information and ensure sum of allocated containers + // closely approximate what the user asked (small excess). + headroomWeighting = (headroom.get(targetId).getMemorySize() + / allocationBookkeeper.totHeadroomMemory) * (ratioHeadroomKnown); + } + return headroomWeighting; + } + + /** + * This helper class is used to book-keep the requests made to each + * subcluster, and maintain useful statistics to split ANY requests. + */ + private final class AllocationBookkeeper { + + // the answer being accumulated + private Map> answer = new TreeMap<>(); + + // stores how many containers we have allocated in each RM for localized + // asks, used to correctly "spread" the corresponding ANY + private Map> countContainersPerRM = + new HashMap<>(); + + private Set activeAndEnabledSC = new HashSet<>(); + private long totNumLocalizedContainers = 0; + private float totHeadroomMemory = 0; + private int totHeadRoomEnabledRMs = 0; + private float totPolicyWeight = 0; + + private void reinitialize( + Map activeSubclusters) + throws YarnException { + + // reset data structures + answer.clear(); + countContainersPerRM.clear(); + activeAndEnabledSC.clear(); + totNumLocalizedContainers = 0; + totHeadroomMemory = 0; + totHeadRoomEnabledRMs = 0; + totPolicyWeight = 0; + + // pre-compute the set of subclusters that are both active and enabled by + // the policy weights, and accumulate their total weight + for (Map.Entry entry : weights.entrySet()) { + if (entry.getValue() > 0 + && activeSubclusters.containsKey(entry.getKey())) { + activeAndEnabledSC.add(entry.getKey()); + totPolicyWeight += entry.getValue(); + } + } + + if (activeAndEnabledSC.size() < 1) { + throw new NoActiveSubclustersException( + "None of the subclusters enabled in this policy (weight>0) are " + + "currently active we cannot forward the ResourceRequest(s)"); + } + + // pre-compute headroom-based weights for active/enabled subclusters + for (Map.Entry r : headroom.entrySet()) { + if (activeAndEnabledSC.contains(r.getKey())) { + totHeadroomMemory += r.getValue().getMemorySize(); + totHeadRoomEnabledRMs++; + } + } + + } + + /** + * Add to the answer a localized node request, and keeps track of statistics + * on a per-allocation-id and per-subcluster bases. + */ + private void addLocalizedNodeRR(SubClusterId targetId, ResourceRequest rr) { + Preconditions.checkArgument(!rr.isAnyLocation(rr.getResourceName())); + + if (!countContainersPerRM.containsKey(rr.getAllocationRequestId())) { + countContainersPerRM.put(rr.getAllocationRequestId(), new HashMap<>()); + } + if (!countContainersPerRM.get(rr.getAllocationRequestId()) + .containsKey(targetId)) { + countContainersPerRM.get(rr.getAllocationRequestId()).put(targetId, + new AtomicLong(0)); + } + countContainersPerRM.get(rr.getAllocationRequestId()).get(targetId) + .addAndGet(rr.getNumContainers()); + + totNumLocalizedContainers += rr.getNumContainers(); + + internalAddToAnswer(targetId, rr); + } + + /** + * Add a rack-local request to the final asnwer. + */ + public void addRackRR(SubClusterId targetId, ResourceRequest rr) { + Preconditions.checkArgument(!rr.isAnyLocation(rr.getResourceName())); + internalAddToAnswer(targetId, rr); + } + + /** + * Add an ANY request to the final answer. + */ + private void addAnyRR(SubClusterId targetId, ResourceRequest rr) { + Preconditions.checkArgument(rr.isAnyLocation(rr.getResourceName())); + internalAddToAnswer(targetId, rr); + } + + private void internalAddToAnswer(SubClusterId targetId, + ResourceRequest partialRR) { + if (!answer.containsKey(targetId)) { + answer.put(targetId, new ArrayList()); + } + answer.get(targetId).add(partialRR); + } + + /** + * Return all known subclusters associated with an allocation id. + * + * @param allocationId the allocation id considered + * + * @return the list of {@link SubClusterId}s associated with this allocation + * id + */ + private Set getSubClustersForId(long allocationId) { + if (countContainersPerRM.get(allocationId) == null) { + return null; + } + return countContainersPerRM.get(allocationId).keySet(); + } + + /** + * Return the answer accumulated so far. + * + * @return the answer + */ + private Map> getAnswer() { + return answer; + } + + /** + * Return the set of sub-clusters that are both active and allowed by our + * policy (weight > 0). + * + * @return a set of active and enabled {@link SubClusterId}s + */ + private Set getActiveAndEnabledSC() { + return activeAndEnabledSC; + } + + /** + * Return the total number of container coming from localized requests. + */ + private long getTotNumLocalizedContainers() { + return totNumLocalizedContainers; + } + + /** + * Returns the number of containers matching an allocation Id that are + * localized in the targetId subcluster. + */ + private long getNumLocalizedContainers(long allocationId, + SubClusterId targetId) { + AtomicLong c = countContainersPerRM.get(allocationId).get(targetId); + return c == null ? 0 : c.get(); + } + + /** + * Returns true is the subcluster request is both active and enabled. + */ + private boolean isActiveAndEnabled(SubClusterId targetId) { + if (targetId == null) { + return false; + } else { + return getActiveAndEnabledSC().contains(targetId); + } + } + + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/package-info.java index 99da20be713..ef72647baec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/package-info.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/package-info.java @@ -17,4 +17,3 @@ */ /** AMRMPRoxy policies. **/ package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; - diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java index a0fa37f221e..62eb03bafc0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java @@ -17,10 +17,19 @@ package org.apache.hadoop.yarn.server.federation.policies.dao; -import com.sun.jersey.api.json.JSONConfiguration; -import com.sun.jersey.api.json.JSONJAXBContext; -import com.sun.jersey.api.json.JSONMarshaller; -import com.sun.jersey.api.json.JSONUnmarshaller; +import java.io.StringReader; +import java.io.StringWriter; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Map; + +import javax.xml.bind.JAXBException; +import javax.xml.bind.Marshaller; +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; + import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -29,24 +38,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.xml.bind.JAXBException; -import javax.xml.bind.Marshaller; -import javax.xml.bind.annotation.XmlAccessType; -import javax.xml.bind.annotation.XmlAccessorType; -import javax.xml.bind.annotation.XmlRootElement; -import java.io.StringReader; -import java.io.StringWriter; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.util.HashMap; -import java.util.Map; +import com.sun.jersey.api.json.JSONConfiguration; +import com.sun.jersey.api.json.JSONJAXBContext; +import com.sun.jersey.api.json.JSONMarshaller; +import com.sun.jersey.api.json.JSONUnmarshaller; /** * This is a DAO class for the configuration of parameteres for federation * policies. This generalizes several possible configurations as two lists of - * {@link SubClusterIdInfo} and corresponding weights as a - * {@link Float}. The interpretation of the weight is left to the logic in - * the policy. + * {@link SubClusterIdInfo} and corresponding weights as a {@link Float}. The + * interpretation of the weight is left to the logic in the policy. */ @InterfaceAudience.Private @@ -57,12 +58,14 @@ public class WeightedPolicyInfo { private static final Logger LOG = LoggerFactory.getLogger(WeightedPolicyInfo.class); - + private static JSONJAXBContext jsonjaxbContext = initContext(); private Map routerPolicyWeights = new HashMap<>(); private Map amrmPolicyWeights = new HashMap<>(); private float headroomAlpha; - private static JSONJAXBContext jsonjaxbContext = initContext(); + public WeightedPolicyInfo() { + // JAXB needs this + } private static JSONJAXBContext initContext() { try { @@ -74,8 +77,46 @@ private static JSONJAXBContext initContext() { return null; } - public WeightedPolicyInfo() { - //JAXB needs this + /** + * Deserializes a {@link WeightedPolicyInfo} from a byte UTF-8 JSON + * representation. + * + * @param bb the input byte representation. + * + * @return the {@link WeightedPolicyInfo} represented. + * + * @throws FederationPolicyInitializationException if a deserializaiton error + * occurs. + */ + public static WeightedPolicyInfo fromByteBuffer(ByteBuffer bb) + throws FederationPolicyInitializationException { + + if (jsonjaxbContext == null) { + throw new FederationPolicyInitializationException( + "JSONJAXBContext should" + " not be null."); + } + + try { + JSONUnmarshaller unmarshaller = jsonjaxbContext.createJSONUnmarshaller(); + final byte[] bytes = new byte[bb.remaining()]; + bb.get(bytes); + String params = new String(bytes, Charset.forName("UTF-8")); + + WeightedPolicyInfo weightedPolicyInfo = unmarshaller.unmarshalFromJSON( + new StringReader(params), WeightedPolicyInfo.class); + return weightedPolicyInfo; + } catch (JAXBException j) { + throw new FederationPolicyInitializationException(j); + } + } + + /** + * Getter of the router weights. + * + * @return the router weights. + */ + public Map getRouterPolicyWeights() { + return routerPolicyWeights; } /** @@ -88,26 +129,9 @@ public void setRouterPolicyWeights( this.routerPolicyWeights = policyWeights; } - /** - * Setter method for ARMRMProxy weights. - * - * @param policyWeights the amrmproxy weights. - */ - public void setAMRMPolicyWeights( - Map policyWeights) { - this.amrmPolicyWeights = policyWeights; - } - - /** - * Getter of the router weights. - * @return the router weights. - */ - public Map getRouterPolicyWeights() { - return routerPolicyWeights; - } - /** * Getter for AMRMProxy weights. + * * @return the AMRMProxy weights. */ public Map getAMRMPolicyWeights() { @@ -115,53 +139,28 @@ public Map getAMRMPolicyWeights() { } /** - * Deserializes a {@link WeightedPolicyInfo} from a byte UTF-8 JSON - * representation. + * Setter method for ARMRMProxy weights. * - * @param bb the input byte representation. - * - * @return the {@link WeightedPolicyInfo} represented. - * - * @throws FederationPolicyInitializationException if a deserializaiton error - * occurs. + * @param policyWeights the amrmproxy weights. */ - public static WeightedPolicyInfo fromByteBuffer(ByteBuffer bb) - throws FederationPolicyInitializationException { - - if (jsonjaxbContext == null) { - throw new FederationPolicyInitializationException("JSONJAXBContext should" - + " not be null."); - } - - try { - JSONUnmarshaller unmarshaller = jsonjaxbContext.createJSONUnmarshaller(); - final byte[] bytes = new byte[bb.remaining()]; - bb.get(bytes); - String params = new String(bytes, Charset.forName("UTF-8")); - - WeightedPolicyInfo weightedPolicyInfo = unmarshaller - .unmarshalFromJSON(new StringReader(params), - WeightedPolicyInfo.class); - return weightedPolicyInfo; - } catch (JAXBException j) { - throw new FederationPolicyInitializationException(j); - } + public void setAMRMPolicyWeights(Map policyWeights) { + this.amrmPolicyWeights = policyWeights; } /** - * Converts the policy into a byte array representation in the input {@link - * ByteBuffer}. + * Converts the policy into a byte array representation in the input + * {@link ByteBuffer}. * * @return byte array representation of this policy configuration. * * @throws FederationPolicyInitializationException if a serialization error - * occurs. + * occurs. */ public ByteBuffer toByteBuffer() throws FederationPolicyInitializationException { if (jsonjaxbContext == null) { - throw new FederationPolicyInitializationException("JSONJAXBContext should" - + " not be null."); + throw new FederationPolicyInitializationException( + "JSONJAXBContext should" + " not be null."); } try { String s = toJSONString(); @@ -186,22 +185,21 @@ public boolean equals(Object other) { return false; } - WeightedPolicyInfo otherPolicy = - (WeightedPolicyInfo) other; + WeightedPolicyInfo otherPolicy = (WeightedPolicyInfo) other; Map otherAMRMWeights = otherPolicy.getAMRMPolicyWeights(); Map otherRouterWeights = otherPolicy.getRouterPolicyWeights(); - boolean amrmWeightsMatch = otherAMRMWeights != null && - getAMRMPolicyWeights() != null && - CollectionUtils.isEqualCollection(otherAMRMWeights.entrySet(), - getAMRMPolicyWeights().entrySet()); + boolean amrmWeightsMatch = + otherAMRMWeights != null && getAMRMPolicyWeights() != null + && CollectionUtils.isEqualCollection(otherAMRMWeights.entrySet(), + getAMRMPolicyWeights().entrySet()); - boolean routerWeightsMatch = otherRouterWeights != null && - getRouterPolicyWeights() != null && - CollectionUtils.isEqualCollection(otherRouterWeights.entrySet(), - getRouterPolicyWeights().entrySet()); + boolean routerWeightsMatch = + otherRouterWeights != null && getRouterPolicyWeights() != null + && CollectionUtils.isEqualCollection(otherRouterWeights.entrySet(), + getRouterPolicyWeights().entrySet()); return amrmWeightsMatch && routerWeightsMatch; } @@ -215,10 +213,10 @@ public int hashCode() { * Return the parameter headroomAlpha, used by policies that balance * weight-based and load-based considerations in their decisions. * - * For policies that use this parameter, values close to 1 indicate that - * most of the decision should be based on currently observed headroom from - * various sub-clusters, values close to zero, indicate that the decision - * should be mostly based on weights and practically ignore current load. + * For policies that use this parameter, values close to 1 indicate that most + * of the decision should be based on currently observed headroom from various + * sub-clusters, values close to zero, indicate that the decision should be + * mostly based on weights and practically ignore current load. * * @return the value of headroomAlpha. */ @@ -227,13 +225,13 @@ public float getHeadroomAlpha() { } /** - * Set the parameter headroomAlpha, used by policies that balance - * weight-based and load-based considerations in their decisions. + * Set the parameter headroomAlpha, used by policies that balance weight-based + * and load-based considerations in their decisions. * - * For policies that use this parameter, values close to 1 indicate that - * most of the decision should be based on currently observed headroom from - * various sub-clusters, values close to zero, indicate that the decision - * should be mostly based on weights and practically ignore current load. + * For policies that use this parameter, values close to 1 indicate that most + * of the decision should be based on currently observed headroom from various + * sub-clusters, values close to zero, indicate that the decision should be + * mostly based on weights and practically ignore current load. * * @param headroomAlpha the value to use for balancing. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/package-info.java index 43f5b83e785..c292e52dd31 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/package-info.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/package-info.java @@ -17,4 +17,3 @@ */ /** DAO objects for serializing/deserializing policy configurations. **/ package org.apache.hadoop.yarn.server.federation.policies.dao; - diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/exceptions/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/exceptions/package-info.java index 3318da9dac9..ad2d5430637 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/exceptions/package-info.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/exceptions/package-info.java @@ -17,4 +17,3 @@ */ /** Exceptions for policies. **/ package org.apache.hadoop.yarn.server.federation.policies.exceptions; - diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/package-info.java index 7d9a121eb35..fa3fcc5ef9b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/package-info.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/package-info.java @@ -17,4 +17,3 @@ */ /** Federation Policies. **/ package org.apache.hadoop.yarn.server.federation.policies; - diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java new file mode 100644 index 00000000000..f49af1d56e7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.router; + +import java.util.Map; + +import org.apache.hadoop.yarn.server.federation.policies.AbstractConfigurableFederationPolicy; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; + +/** + * Base abstract class for {@link FederationRouterPolicy} implementations, that + * provides common validation for reinitialization. + */ +public abstract class AbstractRouterPolicy extends + AbstractConfigurableFederationPolicy implements FederationRouterPolicy { + + @Override + public void validate(WeightedPolicyInfo newPolicyInfo) + throws FederationPolicyInitializationException { + super.validate(newPolicyInfo); + Map newWeights = + newPolicyInfo.getRouterPolicyWeights(); + if (newWeights == null || newWeights.size() < 1) { + throw new FederationPolicyInitializationException( + "Weight vector cannot be null/empty."); + } + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java index 42c86cc0dca..90ea0a87ce6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java @@ -35,11 +35,10 @@ public interface FederationRouterPolicy extends ConfigurableFederationPolicy { * @param appSubmissionContext the context for the app being submitted. * * @return the sub-cluster as identified by {@link SubClusterId} to route the - * request to. + * request to. * * @throws YarnException if the policy cannot determine a viable subcluster. */ SubClusterId getHomeSubcluster( - ApplicationSubmissionContext appSubmissionContext) - throws YarnException; + ApplicationSubmissionContext appSubmissionContext) throws YarnException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java index e57709f432b..5de749fdec1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java @@ -17,8 +17,8 @@ package org.apache.hadoop.yarn.server.federation.policies.router; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import java.util.Map; + import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; @@ -30,34 +30,27 @@ import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; -import java.util.Map; - /** * This implements a simple load-balancing policy. The policy "weights" are * binary 0/1 values that enable/disable each sub-cluster, and the policy peaks * the sub-cluster with the least load to forward this application. */ -public class LoadBasedRouterPolicy - extends BaseWeightedRouterPolicy { - - private static final Log LOG = - LogFactory.getLog(LoadBasedRouterPolicy.class); +public class LoadBasedRouterPolicy extends AbstractRouterPolicy { @Override - public void reinitialize(FederationPolicyInitializationContext - federationPolicyContext) + public void reinitialize(FederationPolicyInitializationContext policyContext) throws FederationPolicyInitializationException { // remember old policyInfo WeightedPolicyInfo tempPolicy = getPolicyInfo(); - //attempt new initialization - super.reinitialize(federationPolicyContext); + // attempt new initialization + super.reinitialize(policyContext); - //check extra constraints + // check extra constraints for (Float weight : getPolicyInfo().getRouterPolicyWeights().values()) { if (weight != 0 && weight != 1) { - //reset to old policyInfo if check fails + // reset to old policyInfo if check fails setPolicyInfo(tempPolicy); throw new FederationPolicyInitializationException( this.getClass().getCanonicalName() @@ -69,18 +62,16 @@ public void reinitialize(FederationPolicyInitializationContext @Override public SubClusterId getHomeSubcluster( - ApplicationSubmissionContext appSubmissionContext) - throws YarnException { + ApplicationSubmissionContext appSubmissionContext) throws YarnException { Map activeSubclusters = getActiveSubclusters(); - Map weights = getPolicyInfo() - .getRouterPolicyWeights(); + Map weights = + getPolicyInfo().getRouterPolicyWeights(); SubClusterIdInfo chosen = null; long currBestMem = -1; - for (Map.Entry entry : - activeSubclusters + for (Map.Entry entry : activeSubclusters .entrySet()) { SubClusterIdInfo id = new SubClusterIdInfo(entry.getKey()); if (weights.containsKey(id) && weights.get(id) > 0) { @@ -95,8 +86,7 @@ public SubClusterId getHomeSubcluster( return chosen.toId(); } - private long getAvailableMemory(SubClusterInfo value) - throws YarnException { + private long getAvailableMemory(SubClusterInfo value) throws YarnException { try { long mem = -1; JSONObject obj = new JSONObject(value.getCapability()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java index a8ac5f7864a..bc3a1f790fa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java @@ -17,39 +17,32 @@ package org.apache.hadoop.yarn.server.federation.policies.router; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import java.util.Map; + import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; -import java.util.Map; - /** * This implements a policy that interprets "weights" as a ordered list of * preferences among sub-clusters. Highest weight among active subclusters is * chosen. */ -public class PriorityRouterPolicy - extends BaseWeightedRouterPolicy { - - private static final Log LOG = - LogFactory.getLog(PriorityRouterPolicy.class); +public class PriorityRouterPolicy extends AbstractRouterPolicy { @Override public SubClusterId getHomeSubcluster( - ApplicationSubmissionContext appSubmissionContext) - throws YarnException { + ApplicationSubmissionContext appSubmissionContext) throws YarnException { Map activeSubclusters = getActiveSubclusters(); // This finds the sub-cluster with the highest weight among the // currently active ones. - Map weights = getPolicyInfo() - .getRouterPolicyWeights(); + Map weights = + getPolicyInfo().getRouterPolicyWeights(); SubClusterId chosen = null; Float currentBest = Float.MIN_VALUE; for (SubClusterId id : activeSubclusters.keySet()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java index 17749618211..b8f9cc329de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java @@ -17,6 +17,11 @@ package org.apache.hadoop.yarn.server.federation.policies.router; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; + import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; @@ -25,11 +30,6 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Random; - /** * This simple policy picks at uniform random among any of the currently active * subclusters. This policy is easy to use and good for testing. @@ -39,7 +39,7 @@ * of the "weights", in which case the {@link UniformRandomRouterPolicy} send * load to them, while {@code WeightedRandomRouterPolicy} does not. */ -public class UniformRandomRouterPolicy extends BaseWeightedRouterPolicy { +public class UniformRandomRouterPolicy extends AbstractRouterPolicy { private Random rand; @@ -49,14 +49,14 @@ public UniformRandomRouterPolicy() { @Override public void reinitialize( - FederationPolicyInitializationContext federationPolicyContext) + FederationPolicyInitializationContext policyContext) throws FederationPolicyInitializationException { FederationPolicyInitializationContextValidator - .validate(federationPolicyContext, this.getClass().getCanonicalName()); + .validate(policyContext, this.getClass().getCanonicalName()); - //note: this overrides BaseWeighterRouterPolicy and ignores the weights + // note: this overrides AbstractRouterPolicy and ignores the weights - setPolicyContext(federationPolicyContext); + setPolicyContext(policyContext); } /** @@ -64,21 +64,19 @@ public void reinitialize( * depend on the weights in the policy). * * @param appSubmissionContext the context for the app being submitted - * (ignored). + * (ignored). * * @return a randomly chosen subcluster. * * @throws YarnException if there are no active subclusters. */ public SubClusterId getHomeSubcluster( - ApplicationSubmissionContext appSubmissionContext) - throws YarnException { + ApplicationSubmissionContext appSubmissionContext) throws YarnException { Map activeSubclusters = getActiveSubclusters(); - List list = - new ArrayList<>(activeSubclusters.keySet()); + List list = new ArrayList<>(activeSubclusters.keySet()); return list.get(rand.nextInt(list.size())); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java index 077767784f5..ac75ae9cf8f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java @@ -18,32 +18,30 @@ package org.apache.hadoop.yarn.server.federation.policies.router; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import java.util.Map; +import java.util.Random; + import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; - -import java.util.Map; -import java.util.Random; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This policy implements a weighted random sample among currently active * sub-clusters. */ -public class WeightedRandomRouterPolicy - extends BaseWeightedRouterPolicy { +public class WeightedRandomRouterPolicy extends AbstractRouterPolicy { - private static final Log LOG = - LogFactory.getLog(WeightedRandomRouterPolicy.class); + private static final Logger LOG = + LoggerFactory.getLogger(WeightedRandomRouterPolicy.class); private Random rand = new Random(System.currentTimeMillis()); @Override public SubClusterId getHomeSubcluster( - ApplicationSubmissionContext appSubmissionContext) - throws YarnException { + ApplicationSubmissionContext appSubmissionContext) throws YarnException { Map activeSubclusters = getActiveSubclusters(); @@ -52,13 +50,13 @@ public SubClusterId getHomeSubcluster( // changes dynamically (and this would unfairly spread the load to // sub-clusters adjacent to an inactive one), hence we need to count/scan // the list and based on weight pick the next sub-cluster. - Map weights = getPolicyInfo() - .getRouterPolicyWeights(); + Map weights = + getPolicyInfo().getRouterPolicyWeights(); float totActiveWeight = 0; - for(Map.Entry entry : weights.entrySet()){ - if(entry.getKey()!=null && activeSubclusters.containsKey(entry.getKey() - .toId())){ + for (Map.Entry entry : weights.entrySet()) { + if (entry.getKey() != null + && activeSubclusters.containsKey(entry.getKey().toId())) { totActiveWeight += entry.getValue(); } } @@ -73,7 +71,7 @@ public SubClusterId getHomeSubcluster( return id; } } - //should never happen + // should never happen return null; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/package-info.java index 5d0fcb66fb2..e445ac33377 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/package-info.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/package-info.java @@ -17,4 +17,3 @@ */ /** Router policies. **/ package org.apache.hadoop.yarn.server.federation.policies.router; - diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java index 8238633f354..6b4f60caf20 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java @@ -57,11 +57,11 @@ public Set getSubClustersForRack(String rackname) return rackToSubClusters.get(rackname); } - protected Map getNodeToSubCluster() { + public Map getNodeToSubCluster() { return nodeToSubCluster; } - protected Map> getRackToSubClusters() { + public Map> getRackToSubClusters() { return rackToSubClusters; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java index 8da92b9876c..ba897da71f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java @@ -22,14 +22,17 @@ import java.nio.ByteBuffer; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Random; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; -import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException; import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; @@ -49,6 +52,7 @@ public abstract class BaseFederationPoliciesTest { private ApplicationSubmissionContext applicationSubmissionContext = mock(ApplicationSubmissionContext.class); private Random rand = new Random(); + private SubClusterId homeSubCluster; @Test public void testReinitilialize() throws YarnException { @@ -88,16 +92,22 @@ public void testReinitilializeBad3() throws YarnException { getPolicy().reinitialize(fpc); } - @Test(expected = NoActiveSubclustersException.class) + @Test(expected = FederationPolicyException.class) public void testNoSubclusters() throws YarnException { // empty the activeSubclusters map FederationPoliciesTestUtil.initializePolicyContext(getPolicy(), getPolicyInfo(), new HashMap<>()); - ConfigurableFederationPolicy currentPolicy = getPolicy(); - if (currentPolicy instanceof FederationRouterPolicy) { - ((FederationRouterPolicy) currentPolicy) + ConfigurableFederationPolicy localPolicy = getPolicy(); + if (localPolicy instanceof FederationRouterPolicy) { + ((FederationRouterPolicy) localPolicy) .getHomeSubcluster(getApplicationSubmissionContext()); + } else { + String[] hosts = new String[] {"host1", "host2" }; + List resourceRequests = FederationPoliciesTestUtil + .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false); + ((FederationAMRMProxyPolicy) localPolicy) + .splitResourceRequests(resourceRequests); } } @@ -152,4 +162,12 @@ public void setRand(Random rand) { this.rand = rand; } + public SubClusterId getHomeSubCluster() { + return homeSubCluster; + } + + public void setHomeSubCluster(SubClusterId homeSubCluster) { + this.homeSubCluster = homeSubCluster; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java index e840b3fee0f..c79fd2a124a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java @@ -16,22 +16,20 @@ * limitations under the License. */ - package org.apache.hadoop.yarn.server.federation.policies; +import java.nio.ByteBuffer; + import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; - import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.junit.Before; import org.junit.Test; -import java.nio.ByteBuffer; - /** * Test class for {@link FederationPolicyInitializationContextValidator}. */ @@ -45,11 +43,10 @@ public class TestFederationPolicyInitializationContextValidator { @Before public void setUp() throws Exception { goodFacade = FederationPoliciesTestUtil.initFacade(); - goodConfig = - new MockPolicyManager().serializeConf(); - goodSR =FederationPoliciesTestUtil.initResolver(); - context = new - FederationPolicyInitializationContext(goodConfig, goodSR, goodFacade); + goodConfig = new MockPolicyManager().serializeConf(); + goodSR = FederationPoliciesTestUtil.initResolver(); + context = new FederationPolicyInitializationContext(goodConfig, goodSR, + goodFacade); } @Test @@ -100,8 +97,7 @@ private class MockPolicyManager implements FederationPolicyManager { @Override public FederationAMRMProxyPolicy getAMRMPolicy( - FederationPolicyInitializationContext - federationPolicyInitializationContext, + FederationPolicyInitializationContext policyContext, FederationAMRMProxyPolicy oldInstance) throws FederationPolicyInitializationException { return null; @@ -109,8 +105,7 @@ public FederationAMRMProxyPolicy getAMRMPolicy( @Override public FederationRouterPolicy getRouterPolicy( - FederationPolicyInitializationContext - federationPolicyInitializationContext, + FederationPolicyInitializationContext policyContext, FederationRouterPolicy oldInstance) throws FederationPolicyInitializationException { return null; @@ -120,8 +115,8 @@ public FederationRouterPolicy getRouterPolicy( public SubClusterPolicyConfiguration serializeConf() throws FederationPolicyInitializationException { ByteBuffer buf = ByteBuffer.allocate(0); - return SubClusterPolicyConfiguration - .newInstance("queue1", this.getClass().getCanonicalName(), buf); + return SubClusterPolicyConfiguration.newInstance("queue1", + this.getClass().getCanonicalName(), buf); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java new file mode 100644 index 00000000000..a21f53dc924 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Simple test class for the {@link BroadcastAMRMProxyPolicy}. + */ +public class TestBroadcastAMRMProxyFederationPolicy + extends BaseFederationPoliciesTest { + + @Before + public void setUp() throws Exception { + setPolicy(new BroadcastAMRMProxyPolicy()); + // needed for base test to work + setPolicyInfo(mock(WeightedPolicyInfo.class)); + + for (int i = 1; i <= 2; i++) { + SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i); + SubClusterInfo sci = mock(SubClusterInfo.class); + when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING); + when(sci.getSubClusterId()).thenReturn(sc.toId()); + getActiveSubclusters().put(sc.toId(), sci); + } + + FederationPoliciesTestUtil.initializePolicyContext(getPolicy(), + mock(WeightedPolicyInfo.class), getActiveSubclusters()); + + } + + @Test + public void testSplitAllocateRequest() throws Exception { + // verify the request is broadcasted to all subclusters + String[] hosts = new String[] {"host1", "host2" }; + List resourceRequests = FederationPoliciesTestUtil + .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false); + + Map> response = + ((FederationAMRMProxyPolicy) getPolicy()) + .splitResourceRequests(resourceRequests); + Assert.assertTrue(response.size() == 2); + for (Map.Entry> entry : response + .entrySet()) { + Assert.assertTrue(getActiveSubclusters().get(entry.getKey()) != null); + for (ResourceRequest r : entry.getValue()) { + Assert.assertTrue(resourceRequests.contains(r)); + } + } + for (SubClusterId subClusterId : getActiveSubclusters().keySet()) { + for (ResourceRequest r : response.get(subClusterId)) { + Assert.assertTrue(resourceRequests.contains(r)); + } + } + } + + @Test + public void testNotifyOfResponse() throws Exception { + String[] hosts = new String[] {"host1", "host2" }; + List resourceRequests = FederationPoliciesTestUtil + .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false); + Map> response = + ((FederationAMRMProxyPolicy) getPolicy()) + .splitResourceRequests(resourceRequests); + + try { + ((FederationAMRMProxyPolicy) getPolicy()).notifyOfResponse( + SubClusterId.newInstance("sc3"), mock(AllocateResponse.class)); + Assert.fail(); + } catch (FederationPolicyException f) { + System.out.println("Expected: " + f.getMessage()); + } + + ((FederationAMRMProxyPolicy) getPolicy()).notifyOfResponse( + SubClusterId.newInstance("sc1"), mock(AllocateResponse.class)); + } + +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java new file mode 100644 index 00000000000..2654a063519 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java @@ -0,0 +1,566 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.resolver.DefaultSubClusterResolverImpl; +import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Simple test class for the {@link LocalityMulticastAMRMProxyPolicy}. + */ +public class TestLocalityMulticastAMRMProxyPolicy + extends BaseFederationPoliciesTest { + + public static final Logger LOG = + LoggerFactory.getLogger(TestLocalityMulticastAMRMProxyPolicy.class); + + @Before + public void setUp() throws Exception { + setPolicy(new LocalityMulticastAMRMProxyPolicy()); + setPolicyInfo(new WeightedPolicyInfo()); + Map routerWeights = new HashMap<>(); + Map amrmWeights = new HashMap<>(); + + // simulate 20 subclusters with a 5% chance of being inactive + for (int i = 0; i < 6; i++) { + SubClusterIdInfo sc = new SubClusterIdInfo("subcluster" + i); + // sub-cluster 3 is not active + if (i != 3) { + SubClusterInfo sci = mock(SubClusterInfo.class); + when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING); + when(sci.getSubClusterId()).thenReturn(sc.toId()); + getActiveSubclusters().put(sc.toId(), sci); + } + + float weight = 1 / 10f; + routerWeights.put(sc, weight); + amrmWeights.put(sc, weight); + // sub-cluster 4 is "disabled" in the weights + if (i == 4) { + routerWeights.put(sc, 0f); + amrmWeights.put(sc, 0f); + } + } + + getPolicyInfo().setRouterPolicyWeights(routerWeights); + getPolicyInfo().setAMRMPolicyWeights(amrmWeights); + getPolicyInfo().setHeadroomAlpha(0.5f); + setHomeSubCluster(SubClusterId.newInstance("homesubcluster")); + + } + + @Test + public void testReinitilialize() throws YarnException { + initializePolicy(); + } + + private void initializePolicy() throws YarnException { + setFederationPolicyContext(new FederationPolicyInitializationContext()); + SubClusterResolver resolver = FederationPoliciesTestUtil.initResolver(); + getFederationPolicyContext().setFederationSubclusterResolver(resolver); + ByteBuffer buf = getPolicyInfo().toByteBuffer(); + getFederationPolicyContext().setSubClusterPolicyConfiguration( + SubClusterPolicyConfiguration.newInstance("queue1", + getPolicy().getClass().getCanonicalName(), buf)); + getFederationPolicyContext().setHomeSubcluster(getHomeSubCluster()); + FederationPoliciesTestUtil.initializePolicyContext( + getFederationPolicyContext(), getPolicy(), getPolicyInfo(), + getActiveSubclusters()); + } + + @Test + public void testSplitBasedOnHeadroom() throws Exception { + + // Tests how the headroom info are used to split based on the capacity + // each RM claims to give us. + // Configure policy to be 100% headroom based + getPolicyInfo().setHeadroomAlpha(1.0f); + + initializePolicy(); + List resourceRequests = createSimpleRequest(); + + prepPolicyWithHeadroom(); + + Map> response = + ((FederationAMRMProxyPolicy) getPolicy()) + .splitResourceRequests(resourceRequests); + + // pretty print requests + LOG.info("Initial headroom"); + prettyPrintRequests(response); + + validateSplit(response, resourceRequests); + + // based on headroom, we expect 75 containers to got to subcluster0, + // as it advertise lots of headroom (100), no containers for sublcuster1 + // as it advertise zero headroom, 1 to subcluster 2 (as it advertise little + // headroom (1), and 25 to subcluster5 which has unknown headroom, and so + // it gets 1/4th of the load + checkExpectedAllocation(response, "subcluster0", 1, 75); + checkExpectedAllocation(response, "subcluster1", 1, -1); + checkExpectedAllocation(response, "subcluster2", 1, 1); + checkExpectedAllocation(response, "subcluster5", 1, 25); + + // notify a change in headroom and try again + AllocateResponse ar = getAllocateResponseWithTargetHeadroom(100); + ((FederationAMRMProxyPolicy) getPolicy()) + .notifyOfResponse(SubClusterId.newInstance("subcluster2"), ar); + ((FederationAMRMProxyPolicy) getPolicy()) + .splitResourceRequests(resourceRequests); + + LOG.info("After headroom update"); + prettyPrintRequests(response); + validateSplit(response, resourceRequests); + + // we simulated a change in headroom for subcluster2, which will now + // have the same headroom of subcluster0 and so it splits the requests + // note that the total is still less or equal to (userAsk + numSubClusters) + checkExpectedAllocation(response, "subcluster0", 1, 38); + checkExpectedAllocation(response, "subcluster1", 1, -1); + checkExpectedAllocation(response, "subcluster2", 1, 38); + checkExpectedAllocation(response, "subcluster5", 1, 25); + + } + + @Test(timeout = 5000) + public void testStressPolicy() throws Exception { + + // Tests how the headroom info are used to split based on the capacity + // each RM claims to give us. + // Configure policy to be 100% headroom based + getPolicyInfo().setHeadroomAlpha(1.0f); + + initializePolicy(); + + int numRR = 1000; + List resourceRequests = createLargeRandomList(numRR); + + prepPolicyWithHeadroom(); + + int numIterations = 1000; + long tstart = System.currentTimeMillis(); + for (int i = 0; i < numIterations; i++) { + Map> response = + ((FederationAMRMProxyPolicy) getPolicy()) + .splitResourceRequests(resourceRequests); + validateSplit(response, resourceRequests); + } + long tend = System.currentTimeMillis(); + + LOG.info("Performed " + numIterations + " policy invocations (and " + + "validations) in " + (tend - tstart) + "ms"); + } + + @Test + public void testFWDAllZeroANY() throws Exception { + + // Tests how the headroom info are used to split based on the capacity + // each RM claims to give us. + // Configure policy to be 100% headroom based + getPolicyInfo().setHeadroomAlpha(0.5f); + + initializePolicy(); + List resourceRequests = createZeroSizedANYRequest(); + + // this receives responses from sc0,sc1,sc2 + prepPolicyWithHeadroom(); + + Map> response = + ((FederationAMRMProxyPolicy) getPolicy()) + .splitResourceRequests(resourceRequests); + + // we expect all three to appear for a zero-sized ANY + + // pretty print requests + prettyPrintRequests(response); + + validateSplit(response, resourceRequests); + + // we expect the zero size request to be sent to the first 3 rm (due to + // the fact that we received responses only from these 3 sublcusters) + checkExpectedAllocation(response, "subcluster0", 1, 0); + checkExpectedAllocation(response, "subcluster1", 1, 0); + checkExpectedAllocation(response, "subcluster2", 1, 0); + checkExpectedAllocation(response, "subcluster3", -1, -1); + checkExpectedAllocation(response, "subcluster4", -1, -1); + checkExpectedAllocation(response, "subcluster5", -1, -1); + } + + @Test + public void testSplitBasedOnHeadroomAndWeights() throws Exception { + + // Tests how the headroom info are used to split based on the capacity + // each RM claims to give us. + + // Configure policy to be 50% headroom based and 50% weight based + getPolicyInfo().setHeadroomAlpha(0.5f); + + initializePolicy(); + List resourceRequests = createSimpleRequest(); + + prepPolicyWithHeadroom(); + + Map> response = + ((FederationAMRMProxyPolicy) getPolicy()) + .splitResourceRequests(resourceRequests); + + // pretty print requests + prettyPrintRequests(response); + + validateSplit(response, resourceRequests); + + // in this case the headroom allocates 50 containers, while weights allocate + // the rest. due to weights we have 12.5 (round to 13) containers for each + // sublcuster, the rest is due to headroom. + checkExpectedAllocation(response, "subcluster0", 1, 50); + checkExpectedAllocation(response, "subcluster1", 1, 13); + checkExpectedAllocation(response, "subcluster2", 1, 13); + checkExpectedAllocation(response, "subcluster3", -1, -1); + checkExpectedAllocation(response, "subcluster4", -1, -1); + checkExpectedAllocation(response, "subcluster5", 1, 25); + + } + + private void prepPolicyWithHeadroom() throws YarnException { + AllocateResponse ar = getAllocateResponseWithTargetHeadroom(100); + ((FederationAMRMProxyPolicy) getPolicy()) + .notifyOfResponse(SubClusterId.newInstance("subcluster0"), ar); + + ar = getAllocateResponseWithTargetHeadroom(0); + ((FederationAMRMProxyPolicy) getPolicy()) + .notifyOfResponse(SubClusterId.newInstance("subcluster1"), ar); + + ar = getAllocateResponseWithTargetHeadroom(1); + ((FederationAMRMProxyPolicy) getPolicy()) + .notifyOfResponse(SubClusterId.newInstance("subcluster2"), ar); + } + + private AllocateResponse getAllocateResponseWithTargetHeadroom( + int numContainers) { + return AllocateResponse.newInstance(0, null, null, + Collections. emptyList(), + Resource.newInstance(numContainers * 1024, numContainers), null, 10, + null, Collections. emptyList()); + } + + @Test + public void testSplitAllocateRequest() throws Exception { + + // Test a complex List is split correctly + initializePolicy(); + + // modify default initialization to include a "homesubcluster" + // which we will use as the default for when nodes or racks are unknown + SubClusterInfo sci = mock(SubClusterInfo.class); + when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING); + when(sci.getSubClusterId()).thenReturn(getHomeSubCluster()); + getActiveSubclusters().put(getHomeSubCluster(), sci); + SubClusterIdInfo sc = new SubClusterIdInfo(getHomeSubCluster().getId()); + + getPolicyInfo().getRouterPolicyWeights().put(sc, 0.1f); + getPolicyInfo().getAMRMPolicyWeights().put(sc, 0.1f); + + FederationPoliciesTestUtil.initializePolicyContext( + getFederationPolicyContext(), getPolicy(), getPolicyInfo(), + getActiveSubclusters()); + + List resourceRequests = createComplexRequest(); + + Map> response = + ((FederationAMRMProxyPolicy) getPolicy()) + .splitResourceRequests(resourceRequests); + + validateSplit(response, resourceRequests); + prettyPrintRequests(response); + + // we expect 4 entry for home subcluster (3 for request-id 4, and a part + // of the broadcast of request-id 2 + checkExpectedAllocation(response, getHomeSubCluster().getId(), 4, 23); + + // for subcluster0 we expect 3 entry from request-id 0, and 3 from + // request-id 3, as well as part of the request-id 2 broadast + checkExpectedAllocation(response, "subcluster0", 7, 26); + + // we expect 5 entry for subcluster1 (4 from request-id 1, and part + // of the broadcast of request-id 2 + checkExpectedAllocation(response, "subcluster1", 5, 25); + + // sub-cluster 2 should contain 3 entry from request-id 1 and 1 from the + // broadcast of request-id 2, and no request-id 0 + checkExpectedAllocation(response, "subcluster2", 4, 23); + + // subcluster id 3, 4 should not appear (due to weights or active/inactive) + checkExpectedAllocation(response, "subcluster3", -1, -1); + checkExpectedAllocation(response, "subcluster4", -1, -1); + + // subcluster5 should get only part of the request-id 2 broadcast + checkExpectedAllocation(response, "subcluster5", 1, 20); + + // check that the allocations that show up are what expected + for (ResourceRequest rr : response.get(getHomeSubCluster())) { + Assert.assertTrue(rr.getAllocationRequestId() == 4L + || rr.getAllocationRequestId() == 2L); + } + + for (ResourceRequest rr : response.get(getHomeSubCluster())) { + Assert.assertTrue(rr.getAllocationRequestId() != 1L); + } + + List rrs = + response.get(SubClusterId.newInstance("subcluster0")); + for (ResourceRequest rr : rrs) { + Assert.assertTrue(rr.getAllocationRequestId() != 1L); + } + + for (ResourceRequest rr : response + .get(SubClusterId.newInstance("subcluster2"))) { + Assert.assertTrue(rr.getAllocationRequestId() != 0L); + } + + for (ResourceRequest rr : response + .get(SubClusterId.newInstance("subcluster5"))) { + Assert.assertTrue(rr.getAllocationRequestId() >= 2); + Assert.assertTrue(rr.getRelaxLocality()); + } + } + + // check that the number of containers in the first ResourceRequest in + // response for this sub-cluster matches expectations. -1 indicate the + // response should be null + private void checkExpectedAllocation( + Map> response, String subCluster, + long totResourceRequests, long totContainers) { + if (totContainers == -1) { + Assert.assertNull(response.get(SubClusterId.newInstance(subCluster))); + } else { + SubClusterId sc = SubClusterId.newInstance(subCluster); + Assert.assertEquals(totResourceRequests, response.get(sc).size()); + + long actualContCount = 0; + for (ResourceRequest rr : response.get(sc)) { + actualContCount += rr.getNumContainers(); + } + Assert.assertEquals(totContainers, actualContCount); + } + } + + private void validateSplit(Map> split, + List original) throws YarnException { + + SubClusterResolver resolver = + getFederationPolicyContext().getFederationSubclusterResolver(); + + // Apply general validation rules + int numUsedSubclusters = split.size(); + + Set originalIds = new HashSet<>(); + Set splitIds = new HashSet<>(); + + int originalContainers = 0; + for (ResourceRequest rr : original) { + originalContainers += rr.getNumContainers(); + originalIds.add(rr.getAllocationRequestId()); + } + + int splitContainers = 0; + for (Map.Entry> rrs : split + .entrySet()) { + for (ResourceRequest rr : rrs.getValue()) { + splitContainers += rr.getNumContainers(); + splitIds.add(rr.getAllocationRequestId()); + // check node-local asks are sent to right RM (only) + SubClusterId fid = null; + try { + fid = resolver.getSubClusterForNode(rr.getResourceName()); + } catch (YarnException e) { + // ignore code will handle + } + if (!rrs.getKey().equals(getHomeSubCluster()) && fid != null + && !fid.equals(rrs.getKey())) { + Assert.fail("A node-local (or resolvable rack-local) RR should not " + + "be send to an RM other than what it resolves to."); + } + } + } + + // check we are not inventing Allocation Ids + Assert.assertEquals(originalIds, splitIds); + + // check we are not exceedingly replicating the container asks among + // RMs (a little is allowed due to rounding of fractional splits) + Assert.assertTrue( + " Containers requested (" + splitContainers + ") should " + + "not exceed the original count of containers (" + + originalContainers + ") by more than the number of subclusters (" + + numUsedSubclusters + ")", + originalContainers + numUsedSubclusters >= splitContainers); + + // Test target Ids + for (SubClusterId targetId : split.keySet()) { + Assert.assertTrue("Target subclusters should be in the active set", + getActiveSubclusters().containsKey(targetId)); + Assert.assertTrue( + "Target subclusters (" + targetId + ") should have weight >0 in " + + "the policy ", + getPolicyInfo().getRouterPolicyWeights() + .get(new SubClusterIdInfo(targetId)) > 0); + } + } + + private void prettyPrintRequests( + Map> response) { + for (Map.Entry> entry : response + .entrySet()) { + String str = ""; + for (ResourceRequest rr : entry.getValue()) { + str += " [id:" + rr.getAllocationRequestId() + " loc:" + + rr.getResourceName() + " numCont:" + rr.getNumContainers() + + "], "; + } + LOG.info(entry.getKey() + " --> " + str); + } + } + + private List createLargeRandomList(int numRR) + throws Exception { + + List out = new ArrayList<>(); + Random rand = new Random(1); + DefaultSubClusterResolverImpl resolver = + (DefaultSubClusterResolverImpl) getFederationPolicyContext() + .getFederationSubclusterResolver(); + + List nodes = + new ArrayList<>(resolver.getNodeToSubCluster().keySet()); + + for (int i = 0; i < numRR; i++) { + String nodeName = nodes.get(rand.nextInt(nodes.size())); + long allocationId = (long) rand.nextInt(20); + + // create a single container request in sc0 + out.add(FederationPoliciesTestUtil.createResourceRequest(allocationId, + nodeName, 1024, 1, 1, rand.nextInt(100), null, rand.nextBoolean())); + } + return out; + } + + private List createSimpleRequest() throws Exception { + + List out = new ArrayList<>(); + + // create a single container request in sc0 + out.add(FederationPoliciesTestUtil.createResourceRequest(0L, + ResourceRequest.ANY, 1024, 1, 1, 100, null, true)); + return out; + } + + private List createZeroSizedANYRequest() throws Exception { + + List out = new ArrayList<>(); + + // create a single container request in sc0 + out.add(FederationPoliciesTestUtil.createResourceRequest(0L, + ResourceRequest.ANY, 1024, 1, 1, 0, null, true)); + return out; + } + + private List createComplexRequest() throws Exception { + + List out = new ArrayList<>(); + + // create a single container request in sc0 + out.add(FederationPoliciesTestUtil.createResourceRequest(0L, + "subcluster0-rack0-host0", 1024, 1, 1, 1, null, false)); + out.add(FederationPoliciesTestUtil.createResourceRequest(0L, + "subcluster0-rack0", 1024, 1, 1, 1, null, false)); + out.add(FederationPoliciesTestUtil.createResourceRequest(0L, + ResourceRequest.ANY, 1024, 1, 1, 1, null, false)); + + // create a single container request with 3 alternative hosts across sc1,sc2 + // where we want 2 containers in sc1 and 1 in sc2 + out.add(FederationPoliciesTestUtil.createResourceRequest(1L, + "subcluster1-rack1-host1", 1024, 1, 1, 1, null, false)); + out.add(FederationPoliciesTestUtil.createResourceRequest(1L, + "subcluster1-rack1-host2", 1024, 1, 1, 1, null, false)); + out.add(FederationPoliciesTestUtil.createResourceRequest(1L, + "subcluster2-rack3-host3", 1024, 1, 1, 1, null, false)); + out.add(FederationPoliciesTestUtil.createResourceRequest(1L, + "subcluster1-rack1", 1024, 1, 1, 2, null, false)); + out.add(FederationPoliciesTestUtil.createResourceRequest(1L, + "subcluster2-rack3", 1024, 1, 1, 1, null, false)); + out.add(FederationPoliciesTestUtil.createResourceRequest(1L, + ResourceRequest.ANY, 1024, 1, 1, 2, null, false)); + + // create a non-local ANY request that can span anything + out.add(FederationPoliciesTestUtil.createResourceRequest(2L, + ResourceRequest.ANY, 1024, 1, 1, 100, null, true)); + + // create a single container request in sc0 with relaxed locality + out.add(FederationPoliciesTestUtil.createResourceRequest(3L, + "subcluster0-rack0-host0", 1024, 1, 1, 1, null, true)); + out.add(FederationPoliciesTestUtil.createResourceRequest(3L, + "subcluster0-rack0", 1024, 1, 1, 1, null, true)); + out.add(FederationPoliciesTestUtil.createResourceRequest(3L, + ResourceRequest.ANY, 1024, 1, 1, 1, null, true)); + + // create a request of an unknown node/rack and expect this to show up + // in homesubcluster + out.add(FederationPoliciesTestUtil.createResourceRequest(4L, "unknownNode", + 1024, 1, 1, 1, null, false)); + out.add(FederationPoliciesTestUtil.createResourceRequest(4L, "unknownRack", + 1024, 1, 1, 1, null, false)); + out.add(FederationPoliciesTestUtil.createResourceRequest(4L, + ResourceRequest.ANY, 1024, 1, 1, 1, null, false)); + + return out; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java index 9e94f725875..906e35fe787 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java @@ -17,6 +17,9 @@ package org.apache.hadoop.yarn.server.federation.policies.router; +import java.util.HashMap; +import java.util.Map; + import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; @@ -29,12 +32,9 @@ import org.junit.Before; import org.junit.Test; -import java.util.HashMap; -import java.util.Map; - /** - * Simple test class for the {@link LoadBasedRouterPolicy}. Test that the - * load is properly considered for allocation. + * Simple test class for the {@link LoadBasedRouterPolicy}. Test that the load + * is properly considered for allocation. */ public class TestLoadBasedRouterPolicy extends BaseFederationPoliciesTest { @@ -47,12 +47,10 @@ public void setUp() throws Exception { // simulate 20 active subclusters for (int i = 0; i < 20; i++) { - SubClusterIdInfo sc = - new SubClusterIdInfo(String.format("sc%02d", i)); + SubClusterIdInfo sc = new SubClusterIdInfo(String.format("sc%02d", i)); SubClusterInfo federationSubClusterInfo = SubClusterInfo.newInstance(sc.toId(), null, null, null, null, -1, - SubClusterState.SC_RUNNING, -1, - generateClusterMetricsInfo(i)); + SubClusterState.SC_RUNNING, -1, generateClusterMetricsInfo(i)); getActiveSubclusters().put(sc.toId(), federationSubClusterInfo); float weight = getRand().nextInt(2); if (i == 5) { @@ -76,7 +74,7 @@ public void setUp() throws Exception { private String generateClusterMetricsInfo(int id) { long mem = 1024 * getRand().nextInt(277 * 100 - 1); - //plant a best cluster + // plant a best cluster if (id == 5) { mem = 1024 * 277 * 100; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java index ff5175d01b9..eefcfd9c557 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java @@ -16,6 +16,12 @@ */ package org.apache.hadoop.yarn.server.federation.policies.router; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; + import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; @@ -28,12 +34,6 @@ import org.junit.Before; import org.junit.Test; -import java.util.HashMap; -import java.util.Map; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - /** * Simple test class for the {@link PriorityRouterPolicy}. Tests that the * weights are correctly used for ordering the choice of sub-clusters. @@ -72,8 +72,7 @@ public void setUp() throws Exception { getPolicyInfo().setRouterPolicyWeights(routerWeights); getPolicyInfo().setAMRMPolicyWeights(amrmWeights); FederationPoliciesTestUtil.initializePolicyContext(getPolicy(), - getPolicyInfo(), - getActiveSubclusters()); + getPolicyInfo(), getActiveSubclusters()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java index a612685b366..78967d02093 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java @@ -17,6 +17,13 @@ package org.apache.hadoop.yarn.server.federation.policies.router; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; @@ -29,13 +36,6 @@ import org.junit.Before; import org.junit.Test; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - /** * Simple test class for the {@link WeightedRandomRouterPolicy}. Generate large * number of randomized tests to check we are weighiting correctly even if @@ -71,8 +71,7 @@ public void setUp() throws Exception { getPolicyInfo().setAMRMPolicyWeights(amrmWeights); FederationPoliciesTestUtil.initializePolicyContext(getPolicy(), - getPolicyInfo(), - getActiveSubclusters()); + getPolicyInfo(), getActiveSubclusters()); } @@ -88,8 +87,8 @@ public void testClusterChosenWithRightProbability() throws YarnException { float numberOfDraws = 1000000; for (float i = 0; i < numberOfDraws; i++) { - SubClusterId chosenId = ((FederationRouterPolicy) getPolicy()). - getHomeSubcluster(getApplicationSubmissionContext()); + SubClusterId chosenId = ((FederationRouterPolicy) getPolicy()) + .getHomeSubcluster(getApplicationSubmissionContext()); counter.get(chosenId).incrementAndGet(); } @@ -113,13 +112,15 @@ public void testClusterChosenWithRightProbability() throws YarnException { if (getActiveSubclusters().containsKey(counterEntry.getKey())) { Assert.assertTrue( "Id " + counterEntry.getKey() + " Actual weight: " + actualWeight - + " expected weight: " + expectedWeight, expectedWeight == 0 || - (actualWeight / expectedWeight) < 1.1 - && (actualWeight / expectedWeight) > 0.9); + + " expected weight: " + expectedWeight, + expectedWeight == 0 || (actualWeight / expectedWeight) < 1.1 + && (actualWeight / expectedWeight) > 0.9); } else { - Assert.assertTrue( - "Id " + counterEntry.getKey() + " Actual weight: " + actualWeight - + " expected weight: " + expectedWeight, actualWeight == 0); + Assert + .assertTrue( + "Id " + counterEntry.getKey() + " Actual weight: " + + actualWeight + " expected weight: " + expectedWeight, + actualWeight == 0); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java index f901329dea6..87ed8d14b60 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java @@ -17,6 +17,7 @@ package org.apache.hadoop.yarn.server.federation.utils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy; @@ -26,6 +27,7 @@ import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.records.*; +import org.apache.hadoop.yarn.util.Records; import java.net.URL; import java.nio.ByteBuffer; @@ -48,6 +50,68 @@ private FederationPoliciesTestUtil() { // disabled. } + private static final String FEDR_NODE_PREFIX = "fedr-test-node-"; + + + public static List createResourceRequests(String[] hosts, + int memory, int vCores, int priority, int containers, + String labelExpression, boolean relaxLocality) throws YarnException { + List reqs = new ArrayList(); + for (String host : hosts) { + ResourceRequest hostReq = + createResourceRequest(host, memory, vCores, priority, containers, + labelExpression, relaxLocality); + reqs.add(hostReq); + ResourceRequest rackReq = + createResourceRequest("/default-rack", memory, vCores, priority, + containers, labelExpression, relaxLocality); + reqs.add(rackReq); + } + + ResourceRequest offRackReq = + createResourceRequest(ResourceRequest.ANY, memory, vCores, priority, + containers, labelExpression, relaxLocality); + reqs.add(offRackReq); + return reqs; + } + + protected static ResourceRequest createResourceRequest(String resource, + int memory, int vCores, int priority, int containers, + boolean relaxLocality) throws YarnException { + return createResourceRequest(resource, memory, vCores, priority, containers, + null, relaxLocality); + } + + @SuppressWarnings("checkstyle:parameternumber") + public static ResourceRequest createResourceRequest(long id, String resource, + int memory, int vCores, int priority, int containers, + String labelExpression, boolean relaxLocality) throws YarnException { + ResourceRequest out = + createResourceRequest(resource, memory, vCores, priority, containers, + labelExpression, relaxLocality); + out.setAllocationRequestId(id); + return out; + } + + public static ResourceRequest createResourceRequest(String resource, + int memory, int vCores, int priority, int containers, + String labelExpression, boolean relaxLocality) throws YarnException { + ResourceRequest req = Records.newRecord(ResourceRequest.class); + req.setResourceName(resource); + req.setNumContainers(containers); + Priority pri = Records.newRecord(Priority.class); + pri.setPriority(priority); + req.setPriority(pri); + Resource capability = Records.newRecord(Resource.class); + capability.setMemorySize(memory); + capability.setVirtualCores(vCores); + req.setCapability(capability); + if (labelExpression != null) { + req.setNodeLabelExpression(labelExpression); + } + req.setRelaxLocality(relaxLocality); + return req; + } public static void initializePolicyContext( FederationPolicyInitializationContext fpc, ConfigurableFederationPolicy diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/nodes b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/nodes index e4d6112baa5..2b7e2372f1f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/nodes +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/nodes @@ -1,4 +1,8 @@ node1,subcluster1,rack1 node2 , subcluster2, RACK1 noDE3,subcluster3, rack2 -node4, subcluster3, rack2 \ No newline at end of file +node4, subcluster3, rack2 +subcluster0-rack0-host0,subcluster0, subcluster0-rack0 +Subcluster1-RACK1-HOST1,subcluster1, subCluster1-RACK1 +SUBCLUSTER1-RACK1-HOST2,subcluster1, subCluster1-RACK1 +SubCluster2-RACK3-HOST3,subcluster2, subcluster2-rack3