diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index bbd03a9bd4a..ee51094e611 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -310,6 +310,15 @@
+
+
+
+
+
+
+
+
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 5dcb993dabf..33bde5491af 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2594,6 +2594,19 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String FEDERATION_MACHINE_LIST =
FEDERATION_PREFIX + "machine-list";
+ public static final String DEFAULT_FEDERATION_POLICY_KEY = "*";
+
+ public static final String FEDERATION_POLICY_MANAGER = FEDERATION_PREFIX
+ + "policy-manager";
+
+ public static final String DEFAULT_FEDERATION_POLICY_MANAGER = "org.apache"
+ + ".hadoop.yarn.server.federation.policies.UniformBroadcastPolicyManager";
+
+ public static final String FEDERATION_POLICY_MANAGER_PARAMS =
+ FEDERATION_PREFIX + "policy-manager-params";
+
+ public static final String DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS = "";
+
////////////////////////////////
// Other Configs
////////////////////////////////
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index 3f3a06c5fba..6e33c0aa40e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -78,6 +78,18 @@ public void initializeMemberVariables() {
configurationPropsToSkipCompare
.add(YarnConfiguration.RM_EPOCH);
+ // Federation policies configs to be ignored
+ configurationPropsToSkipCompare
+ .add(YarnConfiguration.FEDERATION_POLICY_MANAGER);
+ configurationPropsToSkipCompare
+ .add(YarnConfiguration.FEDERATION_POLICY_MANAGER_PARAMS);
+ configurationPropsToSkipCompare
+ .add(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
+ configurationPropsToSkipCompare
+ .add(YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER);
+ configurationPropsToSkipCompare
+ .add(YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS);
+
// Ignore blacklisting nodes for AM failures feature since it is still a
// "work in progress"
configurationPropsToSkipCompare.add(YarnConfiguration.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java
index 1b83bbc8602..3c44e7e1dbb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java
@@ -57,7 +57,7 @@ public static void validate(
if (policyContext.getSubClusterPolicyConfiguration() == null) {
throw new FederationPolicyInitializationException(
- "The FederationSubclusterResolver provided is null. Cannot "
+ "The SubClusterPolicyConfiguration provided is null. Cannot "
+ "reinitalize successfully.");
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/PriorityBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/PriorityBroadcastPolicyManager.java
new file mode 100644
index 00000000000..ebdcf420d40
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/PriorityBroadcastPolicyManager.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.router.PriorityRouterPolicy;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Policy that allows operator to configure "weights" for routing. This picks a
+ * {@link PriorityRouterPolicy} for the router and a
+ * {@link BroadcastAMRMProxyPolicy} for the amrmproxy as they are designed to
+ * work together.
+ */
+public class PriorityBroadcastPolicyManager extends AbstractPolicyManager {
+
+ private WeightedPolicyInfo weightedPolicyInfo;
+
+ public PriorityBroadcastPolicyManager() {
+ // this structurally hard-codes two compatible policies for Router and
+ // AMRMProxy.
+ routerFederationPolicy = PriorityRouterPolicy.class;
+ amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class;
+ weightedPolicyInfo = new WeightedPolicyInfo();
+ }
+
+ @Override
+ public SubClusterPolicyConfiguration serializeConf()
+ throws FederationPolicyInitializationException {
+ ByteBuffer buf = weightedPolicyInfo.toByteBuffer();
+ return SubClusterPolicyConfiguration.newInstance(getQueue(),
+ this.getClass().getCanonicalName(), buf);
+ }
+
+ @VisibleForTesting
+ public WeightedPolicyInfo getWeightedPolicyInfo() {
+ return weightedPolicyInfo;
+ }
+
+ @VisibleForTesting
+ public void setWeightedPolicyInfo(WeightedPolicyInfo weightedPolicyInfo) {
+ this.weightedPolicyInfo = weightedPolicyInfo;
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
new file mode 100644
index 00000000000..a3fd15a3b51
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
+import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This class provides a facade to the policy subsystem, and handles the
+ * lifecycle of policies (e.g., refresh from remote, default behaviors etc.).
+ */
+public class RouterPolicyFacade {
+
+ private static final Log LOG =
+ LogFactory.getLog(LocalityMulticastAMRMProxyPolicy.class);
+
+ private final SubClusterResolver subClusterResolver;
+ private final FederationStateStoreFacade federationFacade;
+ private Map globalConfMap;
+
+ @VisibleForTesting
+ Map globalPolicyMap;
+
+ public RouterPolicyFacade(YarnConfiguration conf,
+ FederationStateStoreFacade facade, SubClusterResolver resolver,
+ SubClusterId homeSubcluster)
+ throws FederationPolicyInitializationException {
+
+ this.federationFacade = facade;
+ this.subClusterResolver = resolver;
+ this.globalConfMap = new ConcurrentHashMap<>();
+ this.globalPolicyMap = new ConcurrentHashMap<>();
+
+ // load default behavior from store if possible
+ String defaulKey = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
+ SubClusterPolicyConfiguration configuration = null;
+ try {
+ configuration = federationFacade.getPolicyConfiguration(defaulKey);
+ } catch (YarnException e) {
+ LOG.warn("No fallback behavior defined in store, defaulting to XML "
+ + "configuration fallback behavior.");
+ }
+
+ // or from XML conf otherwise.
+ if (configuration == null) {
+ String defaultFederationPolicyManager =
+ conf.get(YarnConfiguration.FEDERATION_POLICY_MANAGER,
+ YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER);
+ String defaultPolicyParamString =
+ conf.get(YarnConfiguration.FEDERATION_POLICY_MANAGER_PARAMS,
+ YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS);
+ ByteBuffer defaultPolicyParam = ByteBuffer
+ .wrap(defaultPolicyParamString.getBytes(StandardCharsets.UTF_8));
+
+ configuration = SubClusterPolicyConfiguration.newInstance(defaulKey,
+ defaultFederationPolicyManager, defaultPolicyParam);
+ }
+
+ // construct the required policy manager
+ FederationPolicyInitializationContext fallbackContext =
+ new FederationPolicyInitializationContext(configuration,
+ subClusterResolver, federationFacade, homeSubcluster);
+ FederationPolicyManager fallbackPolicyManager =
+ instantiatePolicyManager(configuration.getType());
+ fallbackPolicyManager.setQueue(defaulKey);
+
+ // add to the cache the fallback behavior
+ globalConfMap.put(defaulKey,
+ fallbackContext.getSubClusterPolicyConfiguration());
+ globalPolicyMap.put(defaulKey,
+ fallbackPolicyManager.getRouterPolicy(fallbackContext, null));
+
+ }
+
+ /**
+ * This method provides a wrapper of all policy functionalities for routing .
+ * Internally it manages configuration changes, and policy init/reinit.
+ *
+ * @param appSubmissionContext the application to route.
+ *
+ * @return the id of the subcluster that will be the "home" for this
+ * application.
+ *
+ * @throws YarnException if there are issues initializing policies, or no
+ * valid sub-cluster id could be found for this app.
+ */
+ public SubClusterId getHomeSubcluster(
+ ApplicationSubmissionContext appSubmissionContext) throws YarnException {
+
+ // the maps are concurrent, but we need to protect from reset()
+ // reinitialization mid-execution by creating a new reference local to this
+ // method.
+ Map cachedConfs = globalConfMap;
+ Map policyMap = globalPolicyMap;
+
+ if (appSubmissionContext == null) {
+ throw new FederationPolicyException(
+ "The ApplicationSubmissionContext " + "cannot be null.");
+ }
+
+ String queue = appSubmissionContext.getQueue();
+
+ // respecting YARN behavior we assume default queue if the queue is not
+ // specified. This also ensures that "null" can be used as a key to get the
+ // default behavior.
+ if (queue == null) {
+ queue = YarnConfiguration.DEFAULT_QUEUE_NAME;
+ }
+
+ // the facade might cache this request, based on its parameterization
+ SubClusterPolicyConfiguration configuration = null;
+
+ try {
+ configuration = federationFacade.getPolicyConfiguration(queue);
+ } catch (YarnException e) {
+ LOG.debug(e);
+ }
+
+ // If there is no policy configured for this queue, fallback to the baseline
+ // policy that is configured either in the store or via XML config (and
+ // cached)
+ if (configuration == null) {
+ try {
+ LOG.warn("There is no policies configured for queue: " + queue + " we"
+ + " fallback to default policy for: "
+ + YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
+
+ queue = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
+ configuration = federationFacade.getPolicyConfiguration(
+ YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
+ } catch (YarnException e) {
+ // the fallback is not configure via store, but via XML, using
+ // previously loaded configuration.
+ configuration =
+ cachedConfs.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
+ }
+ }
+
+ // if the configuration has changed since last loaded, reinit the policy
+ // based on current configuration
+ if (!cachedConfs.containsKey(queue)
+ || !cachedConfs.get(queue).equals(configuration)) {
+ singlePolicyReinit(policyMap, cachedConfs, queue, configuration);
+ }
+
+ FederationRouterPolicy policy = policyMap.get(queue);
+ if (policy == null) {
+ // this should never happen, as the to maps are updated together
+ throw new FederationPolicyException("No FederationRouterPolicy found "
+ + "for queue: " + appSubmissionContext.getQueue() + " (for "
+ + "application: " + appSubmissionContext.getApplicationId() + ") "
+ + "and no default specified.");
+ }
+
+ return policy.getHomeSubcluster(appSubmissionContext);
+ }
+
+ /**
+ * This method reinitializes a policy and loads it in the policyMap.
+ *
+ * @param queue the queue to initialize a policy for.
+ * @param conf the configuration to use for initalization.
+ *
+ * @throws FederationPolicyInitializationException if initialization fails.
+ */
+ private void singlePolicyReinit(Map policyMap,
+ Map cachedConfs, String queue,
+ SubClusterPolicyConfiguration conf)
+ throws FederationPolicyInitializationException {
+
+ FederationPolicyInitializationContext context =
+ new FederationPolicyInitializationContext(conf, subClusterResolver,
+ federationFacade, null);
+ String newType = context.getSubClusterPolicyConfiguration().getType();
+ FederationRouterPolicy routerPolicy = policyMap.get(queue);
+
+ FederationPolicyManager federationPolicyManager =
+ instantiatePolicyManager(newType);
+ // set queue, reinit policy if required (implementation lazily check
+ // content of conf), and cache it
+ federationPolicyManager.setQueue(queue);
+ routerPolicy =
+ federationPolicyManager.getRouterPolicy(context, routerPolicy);
+
+ // we need the two put to be atomic (across multiple threads invoking
+ // this and reset operations)
+ synchronized (this) {
+ policyMap.put(queue, routerPolicy);
+ cachedConfs.put(queue, conf);
+ }
+ }
+
+ private static FederationPolicyManager instantiatePolicyManager(
+ String newType) throws FederationPolicyInitializationException {
+ FederationPolicyManager federationPolicyManager = null;
+ try {
+ // create policy instance and set queue
+ Class c = Class.forName(newType);
+ federationPolicyManager = (FederationPolicyManager) c.newInstance();
+ } catch (ClassNotFoundException e) {
+ throw new FederationPolicyInitializationException(e);
+ } catch (InstantiationException e) {
+ throw new FederationPolicyInitializationException(e);
+ } catch (IllegalAccessException e) {
+ throw new FederationPolicyInitializationException(e);
+ }
+ return federationPolicyManager;
+ }
+
+ /**
+ * This method flushes all cached configurations and policies. This should be
+ * invoked if the facade remains activity after very large churn of queues in
+ * the system.
+ */
+ public synchronized void reset() {
+
+ // remember the fallBack
+ SubClusterPolicyConfiguration conf =
+ globalConfMap.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
+ FederationRouterPolicy policy =
+ globalPolicyMap.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
+
+ globalConfMap = new ConcurrentHashMap<>();
+ globalPolicyMap = new ConcurrentHashMap<>();
+
+ // add to the cache a fallback with keyword null
+ globalConfMap.put(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY, conf);
+ globalPolicyMap.put(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY,
+ policy);
+
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java
index 62eb03bafc0..e7b8afe1490 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java
@@ -20,7 +20,7 @@
import java.io.StringReader;
import java.io.StringWriter;
import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
@@ -100,7 +100,7 @@ public static WeightedPolicyInfo fromByteBuffer(ByteBuffer bb)
JSONUnmarshaller unmarshaller = jsonjaxbContext.createJSONUnmarshaller();
final byte[] bytes = new byte[bb.remaining()];
bb.get(bytes);
- String params = new String(bytes, Charset.forName("UTF-8"));
+ String params = new String(bytes, StandardCharsets.UTF_8);
WeightedPolicyInfo weightedPolicyInfo = unmarshaller.unmarshalFromJSON(
new StringReader(params), WeightedPolicyInfo.class);
@@ -164,7 +164,7 @@ public ByteBuffer toByteBuffer()
}
try {
String s = toJSONString();
- return ByteBuffer.wrap(s.getBytes(Charset.forName("UTF-8")));
+ return ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8));
} catch (JAXBException j) {
throw new FederationPolicyInitializationException(j);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
index 66a0b605e1a..9b794de23c5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
@@ -56,6 +56,7 @@
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@@ -262,12 +263,17 @@ public SubClusterPolicyConfiguration getPolicyConfiguration(
if (isCachingEnabled()) {
return getPoliciesConfigurations().get(queue);
} else {
- return stateStore
- .getPolicyConfiguration(
- GetSubClusterPolicyConfigurationRequest.newInstance(queue))
- .getPolicyConfiguration();
- }
+ GetSubClusterPolicyConfigurationResponse response =
+ stateStore.getPolicyConfiguration(
+ GetSubClusterPolicyConfigurationRequest.newInstance(queue));
+ if (response == null) {
+ throw new YarnException("The stateStore returned a null for "
+ + "GetSubClusterPolicyConfigurationResponse for queue " + queue);
+ } else {
+ return response.getPolicyConfiguration();
+ }
+ }
}
/**
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestPriorityBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestPriorityBroadcastPolicyManager.java
new file mode 100644
index 00000000000..5e5bc83d00e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestPriorityBroadcastPolicyManager.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.router.PriorityRouterPolicy;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Simple test of {@link PriorityBroadcastPolicyManager}.
+ */
+public class TestPriorityBroadcastPolicyManager extends BasePolicyManagerTest {
+
+ private WeightedPolicyInfo policyInfo;
+
+ @Before
+ public void setup() {
+ // configure a policy
+
+ wfp = new PriorityBroadcastPolicyManager();
+ wfp.setQueue("queue1");
+ SubClusterId sc1 = SubClusterId.newInstance("sc1");
+ SubClusterId sc2 = SubClusterId.newInstance("sc2");
+ policyInfo = new WeightedPolicyInfo();
+
+ Map routerWeights = new HashMap<>();
+ routerWeights.put(new SubClusterIdInfo(sc1), 0.2f);
+ routerWeights.put(new SubClusterIdInfo(sc2), 0.8f);
+ policyInfo.setRouterPolicyWeights(routerWeights);
+
+ ((PriorityBroadcastPolicyManager) wfp).setWeightedPolicyInfo(policyInfo);
+
+ // set expected params that the base test class will use for tests
+ expectedPolicyManager = PriorityBroadcastPolicyManager.class;
+ expectedAMRMProxyPolicy = BroadcastAMRMProxyPolicy.class;
+ expectedRouterPolicy = PriorityRouterPolicy.class;
+ }
+
+ @Test
+ public void testPolicyInfoSetCorrectly() throws Exception {
+ serializeAndDeserializePolicyManager(wfp, expectedPolicyManager,
+ expectedAMRMProxyPolicy, expectedRouterPolicy);
+
+ // check the policyInfo propagates through ser/der correctly
+ Assert.assertEquals(
+ ((PriorityBroadcastPolicyManager) wfp).getWeightedPolicyInfo(),
+ policyInfo);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java
new file mode 100644
index 00000000000..4975a9fb81d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.router.PriorityRouterPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy;
+import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Simple test of {@link RouterPolicyFacade}.
+ */
+public class TestRouterPolicyFacade {
+
+ private RouterPolicyFacade routerFacade;
+ private List subClusterIds;
+ private FederationStateStore store;
+ private String queue1 = "queue1";
+ private String defQueueKey = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
+
+ @Before
+ public void setup() throws YarnException {
+
+ // setting up a store and its facade (with caching off)
+ FederationStateStoreFacade fedFacade =
+ FederationStateStoreFacade.getInstance();
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.set(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, "0");
+ store = new MemoryFederationStateStore();
+ store.init(conf);
+ fedFacade.reinitialize(store, conf);
+
+ FederationStateStoreTestUtil storeTestUtil =
+ new FederationStateStoreTestUtil(store);
+ storeTestUtil.registerSubClusters(10);
+
+ subClusterIds = storeTestUtil.getAllSubClusterIds(true);
+ store.setPolicyConfiguration(SetSubClusterPolicyConfigurationRequest
+ .newInstance(getUniformPolicy(queue1)));
+
+ SubClusterResolver resolver = FederationPoliciesTestUtil.initResolver();
+ routerFacade = new RouterPolicyFacade(new YarnConfiguration(), fedFacade,
+ resolver, subClusterIds.get(0));
+ }
+
+ @Test
+ public void testConfigurationUpdate() throws YarnException {
+
+ // in this test we see what happens when the configuration is changed
+ // between calls. We achieve this by changing what is in the store.
+
+ ApplicationSubmissionContext applicationSubmissionContext =
+ mock(ApplicationSubmissionContext.class);
+ when(applicationSubmissionContext.getQueue()).thenReturn(queue1);
+
+ // first call runs using standard UniformRandomRouterPolicy
+ SubClusterId chosen =
+ routerFacade.getHomeSubcluster(applicationSubmissionContext);
+ Assert.assertTrue(subClusterIds.contains(chosen));
+ Assert.assertTrue(routerFacade.globalPolicyMap
+ .get(queue1) instanceof UniformRandomRouterPolicy);
+
+ // then the operator changes how queue1 is routed setting it to
+ // PriorityRouterPolicy with weights favoring the first subcluster in
+ // subClusterIds.
+ store.setPolicyConfiguration(SetSubClusterPolicyConfigurationRequest
+ .newInstance(getPriorityPolicy(queue1)));
+
+ // second call is routed by new policy PriorityRouterPolicy
+ chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext);
+ Assert.assertTrue(chosen.equals(subClusterIds.get(0)));
+ Assert.assertTrue(routerFacade.globalPolicyMap
+ .get(queue1) instanceof PriorityRouterPolicy);
+ }
+
+ @Test
+ public void testGetHomeSubcluster() throws YarnException {
+
+ ApplicationSubmissionContext applicationSubmissionContext =
+ mock(ApplicationSubmissionContext.class);
+ when(applicationSubmissionContext.getQueue()).thenReturn(queue1);
+
+ // the facade only contains the fallback behavior
+ Assert.assertTrue(routerFacade.globalPolicyMap.containsKey(defQueueKey)
+ && routerFacade.globalPolicyMap.size() == 1);
+
+ // when invoked it returns the expected SubClusterId.
+ SubClusterId chosen =
+ routerFacade.getHomeSubcluster(applicationSubmissionContext);
+ Assert.assertTrue(subClusterIds.contains(chosen));
+
+ // now the caching of policies must have added an entry for this queue
+ Assert.assertTrue(routerFacade.globalPolicyMap.size() == 2);
+
+ // after the facade is used the policyMap contains the expected policy type.
+ Assert.assertTrue(routerFacade.globalPolicyMap
+ .get(queue1) instanceof UniformRandomRouterPolicy);
+
+ // the facade is again empty after reset
+ routerFacade.reset();
+ // the facade only contains the fallback behavior
+ Assert.assertTrue(routerFacade.globalPolicyMap.containsKey(defQueueKey)
+ && routerFacade.globalPolicyMap.size() == 1);
+
+ }
+
+ @Test
+ public void testFallbacks() throws YarnException {
+
+ // this tests the behavior of the system when the queue requested is
+ // not configured (or null) and there is no default policy configured
+ // for DEFAULT_FEDERATION_POLICY_KEY (*). This is our second line of
+ // defense.
+
+ ApplicationSubmissionContext applicationSubmissionContext =
+ mock(ApplicationSubmissionContext.class);
+
+ // The facade answers also for non-initialized policies (using the
+ // defaultPolicy)
+ String uninitQueue = "non-initialized-queue";
+ when(applicationSubmissionContext.getQueue()).thenReturn(uninitQueue);
+ SubClusterId chosen =
+ routerFacade.getHomeSubcluster(applicationSubmissionContext);
+ Assert.assertTrue(subClusterIds.contains(chosen));
+ Assert.assertFalse(routerFacade.globalPolicyMap.containsKey(uninitQueue));
+
+ // empty string
+ when(applicationSubmissionContext.getQueue()).thenReturn("");
+ chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext);
+ Assert.assertTrue(subClusterIds.contains(chosen));
+ Assert.assertFalse(routerFacade.globalPolicyMap.containsKey(uninitQueue));
+
+ // null queue also falls back to default
+ when(applicationSubmissionContext.getQueue()).thenReturn(null);
+ chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext);
+ Assert.assertTrue(subClusterIds.contains(chosen));
+ Assert.assertFalse(routerFacade.globalPolicyMap.containsKey(uninitQueue));
+
+ }
+
+ public static SubClusterPolicyConfiguration getUniformPolicy(String queue)
+ throws FederationPolicyInitializationException {
+
+ // we go through standard lifecycle instantiating a policyManager and
+ // configuring it and serializing it to a conf.
+ UniformBroadcastPolicyManager wfp = new UniformBroadcastPolicyManager();
+ wfp.setQueue(queue);
+
+ SubClusterPolicyConfiguration fpc = wfp.serializeConf();
+
+ return fpc;
+ }
+
+ public SubClusterPolicyConfiguration getPriorityPolicy(String queue)
+ throws FederationPolicyInitializationException {
+
+ // we go through standard lifecycle instantiating a policyManager and
+ // configuring it and serializing it to a conf.
+ PriorityBroadcastPolicyManager wfp = new PriorityBroadcastPolicyManager();
+
+ // equal weight to all subcluster
+ Map routerWeights = new HashMap<>();
+ for (SubClusterId s : subClusterIds) {
+ routerWeights.put(new SubClusterIdInfo(s), 0.9f / subClusterIds.size());
+ }
+
+ // beside the first one who gets more weight
+ SubClusterIdInfo favorite = new SubClusterIdInfo((subClusterIds.get(0)));
+ routerWeights.put(favorite, (0.1f + 0.9f / subClusterIds.size()));
+
+ WeightedPolicyInfo policyInfo = new WeightedPolicyInfo();
+ policyInfo.setRouterPolicyWeights(routerWeights);
+ wfp.setWeightedPolicyInfo(policyInfo);
+ wfp.setQueue(queue);
+
+ // serializeConf it in a context
+ SubClusterPolicyConfiguration fpc = wfp.serializeConf();
+
+ return fpc;
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java
index c179521ea6a..649a61b4ef8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.federation.utils;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -29,6 +31,7 @@
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@@ -62,8 +65,8 @@ private SubClusterInfo createSubClusterInfo(SubClusterId subClusterId) {
String webAppAddress = "1.2.3.4:4";
return SubClusterInfo.newInstance(subClusterId, amRMAddress,
- clientRMAddress, rmAdminAddress, webAppAddress, SubClusterState.SC_NEW,
- CLOCK.getTime(), "capability");
+ clientRMAddress, rmAdminAddress, webAppAddress,
+ SubClusterState.SC_RUNNING, CLOCK.getTime(), "capability");
}
private void registerSubCluster(SubClusterId subClusterId)
@@ -97,6 +100,21 @@ public void addAppsHomeSC(long clusterTs, int numApps) throws YarnException {
}
}
+ public List getAllSubClusterIds(
+ boolean filterInactiveSubclusters) throws YarnException {
+
+ List infos = stateStore
+ .getSubClusters(
+ GetSubClustersInfoRequest.newInstance(filterInactiveSubclusters))
+ .getSubClusters();
+ List ids = new ArrayList<>();
+ for (SubClusterInfo s : infos) {
+ ids.add(s.getSubClusterId());
+ }
+
+ return ids;
+ }
+
private SubClusterPolicyConfiguration createSCPolicyConf(String queueName,
String policyType) {
return SubClusterPolicyConfiguration.newInstance(queueName, policyType,