YARN-5634. Simplify initialization/use of RouterPolicy via a RouterPolicyFacade. (Carlo Curino via Subru).

(cherry picked from commit d7672ce2bd)
This commit is contained in:
Subru Krishnan 2016-11-16 19:39:25 -08:00 committed by Carlo Curino
parent 93d9fdeca6
commit 083a05bc14
11 changed files with 693 additions and 11 deletions
hadoop-yarn-project/hadoop-yarn
dev-support
hadoop-yarn-api/src
main/java/org/apache/hadoop/yarn/conf
test/java/org/apache/hadoop/yarn/conf
hadoop-yarn-server/hadoop-yarn-server-common/src

View File

@ -310,6 +310,15 @@
<Bug pattern="IS2_INCONSISTENT_SYNC"/> <Bug pattern="IS2_INCONSISTENT_SYNC"/>
</Match> </Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade"/>
<Or>
<Field name="globalConfMap"/>
<Field name="globalPolicyMap"/>
</Or>
<Bug pattern="IS2_INCONSISTENT_SYNC"/>
</Match>
<!-- Don't care if putIfAbsent value is ignored --> <!-- Don't care if putIfAbsent value is ignored -->
<Match> <Match>
<Package name="org.apache.hadoop.yarn.factories.impl.pb" /> <Package name="org.apache.hadoop.yarn.factories.impl.pb" />

View File

@ -2594,6 +2594,19 @@ public class YarnConfiguration extends Configuration {
public static final String FEDERATION_MACHINE_LIST = public static final String FEDERATION_MACHINE_LIST =
FEDERATION_PREFIX + "machine-list"; FEDERATION_PREFIX + "machine-list";
public static final String DEFAULT_FEDERATION_POLICY_KEY = "*";
public static final String FEDERATION_POLICY_MANAGER = FEDERATION_PREFIX
+ "policy-manager";
public static final String DEFAULT_FEDERATION_POLICY_MANAGER = "org.apache"
+ ".hadoop.yarn.server.federation.policies.UniformBroadcastPolicyManager";
public static final String FEDERATION_POLICY_MANAGER_PARAMS =
FEDERATION_PREFIX + "policy-manager-params";
public static final String DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS = "";
//////////////////////////////// ////////////////////////////////
// Other Configs // Other Configs
//////////////////////////////// ////////////////////////////////

View File

@ -78,6 +78,18 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
configurationPropsToSkipCompare configurationPropsToSkipCompare
.add(YarnConfiguration.RM_EPOCH); .add(YarnConfiguration.RM_EPOCH);
// Federation policies configs to be ignored
configurationPropsToSkipCompare
.add(YarnConfiguration.FEDERATION_POLICY_MANAGER);
configurationPropsToSkipCompare
.add(YarnConfiguration.FEDERATION_POLICY_MANAGER_PARAMS);
configurationPropsToSkipCompare
.add(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
configurationPropsToSkipCompare
.add(YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER);
configurationPropsToSkipCompare
.add(YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS);
// Ignore blacklisting nodes for AM failures feature since it is still a // Ignore blacklisting nodes for AM failures feature since it is still a
// "work in progress" // "work in progress"
configurationPropsToSkipCompare.add(YarnConfiguration. configurationPropsToSkipCompare.add(YarnConfiguration.

View File

@ -57,7 +57,7 @@ public final class FederationPolicyInitializationContextValidator {
if (policyContext.getSubClusterPolicyConfiguration() == null) { if (policyContext.getSubClusterPolicyConfiguration() == null) {
throw new FederationPolicyInitializationException( throw new FederationPolicyInitializationException(
"The FederationSubclusterResolver provided is null. Cannot " "The SubClusterPolicyConfiguration provided is null. Cannot "
+ "reinitalize successfully."); + "reinitalize successfully.");
} }

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies;
import java.nio.ByteBuffer;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.router.PriorityRouterPolicy;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import com.google.common.annotations.VisibleForTesting;
/**
* Policy that allows operator to configure "weights" for routing. This picks a
* {@link PriorityRouterPolicy} for the router and a
* {@link BroadcastAMRMProxyPolicy} for the amrmproxy as they are designed to
* work together.
*/
public class PriorityBroadcastPolicyManager extends AbstractPolicyManager {
private WeightedPolicyInfo weightedPolicyInfo;
public PriorityBroadcastPolicyManager() {
// this structurally hard-codes two compatible policies for Router and
// AMRMProxy.
routerFederationPolicy = PriorityRouterPolicy.class;
amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class;
weightedPolicyInfo = new WeightedPolicyInfo();
}
@Override
public SubClusterPolicyConfiguration serializeConf()
throws FederationPolicyInitializationException {
ByteBuffer buf = weightedPolicyInfo.toByteBuffer();
return SubClusterPolicyConfiguration.newInstance(getQueue(),
this.getClass().getCanonicalName(), buf);
}
@VisibleForTesting
public WeightedPolicyInfo getWeightedPolicyInfo() {
return weightedPolicyInfo;
}
@VisibleForTesting
public void setWeightedPolicyInfo(WeightedPolicyInfo weightedPolicyInfo) {
this.weightedPolicyInfo = weightedPolicyInfo;
}
}

View File

@ -0,0 +1,266 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import com.google.common.annotations.VisibleForTesting;
/**
* This class provides a facade to the policy subsystem, and handles the
* lifecycle of policies (e.g., refresh from remote, default behaviors etc.).
*/
public class RouterPolicyFacade {
private static final Log LOG =
LogFactory.getLog(LocalityMulticastAMRMProxyPolicy.class);
private final SubClusterResolver subClusterResolver;
private final FederationStateStoreFacade federationFacade;
private Map<String, SubClusterPolicyConfiguration> globalConfMap;
@VisibleForTesting
Map<String, FederationRouterPolicy> globalPolicyMap;
public RouterPolicyFacade(YarnConfiguration conf,
FederationStateStoreFacade facade, SubClusterResolver resolver,
SubClusterId homeSubcluster)
throws FederationPolicyInitializationException {
this.federationFacade = facade;
this.subClusterResolver = resolver;
this.globalConfMap = new ConcurrentHashMap<>();
this.globalPolicyMap = new ConcurrentHashMap<>();
// load default behavior from store if possible
String defaulKey = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
SubClusterPolicyConfiguration configuration = null;
try {
configuration = federationFacade.getPolicyConfiguration(defaulKey);
} catch (YarnException e) {
LOG.warn("No fallback behavior defined in store, defaulting to XML "
+ "configuration fallback behavior.");
}
// or from XML conf otherwise.
if (configuration == null) {
String defaultFederationPolicyManager =
conf.get(YarnConfiguration.FEDERATION_POLICY_MANAGER,
YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER);
String defaultPolicyParamString =
conf.get(YarnConfiguration.FEDERATION_POLICY_MANAGER_PARAMS,
YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS);
ByteBuffer defaultPolicyParam = ByteBuffer
.wrap(defaultPolicyParamString.getBytes(StandardCharsets.UTF_8));
configuration = SubClusterPolicyConfiguration.newInstance(defaulKey,
defaultFederationPolicyManager, defaultPolicyParam);
}
// construct the required policy manager
FederationPolicyInitializationContext fallbackContext =
new FederationPolicyInitializationContext(configuration,
subClusterResolver, federationFacade, homeSubcluster);
FederationPolicyManager fallbackPolicyManager =
instantiatePolicyManager(configuration.getType());
fallbackPolicyManager.setQueue(defaulKey);
// add to the cache the fallback behavior
globalConfMap.put(defaulKey,
fallbackContext.getSubClusterPolicyConfiguration());
globalPolicyMap.put(defaulKey,
fallbackPolicyManager.getRouterPolicy(fallbackContext, null));
}
/**
* This method provides a wrapper of all policy functionalities for routing .
* Internally it manages configuration changes, and policy init/reinit.
*
* @param appSubmissionContext the application to route.
*
* @return the id of the subcluster that will be the "home" for this
* application.
*
* @throws YarnException if there are issues initializing policies, or no
* valid sub-cluster id could be found for this app.
*/
public SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext) throws YarnException {
// the maps are concurrent, but we need to protect from reset()
// reinitialization mid-execution by creating a new reference local to this
// method.
Map<String, SubClusterPolicyConfiguration> cachedConfs = globalConfMap;
Map<String, FederationRouterPolicy> policyMap = globalPolicyMap;
if (appSubmissionContext == null) {
throw new FederationPolicyException(
"The ApplicationSubmissionContext " + "cannot be null.");
}
String queue = appSubmissionContext.getQueue();
// respecting YARN behavior we assume default queue if the queue is not
// specified. This also ensures that "null" can be used as a key to get the
// default behavior.
if (queue == null) {
queue = YarnConfiguration.DEFAULT_QUEUE_NAME;
}
// the facade might cache this request, based on its parameterization
SubClusterPolicyConfiguration configuration = null;
try {
configuration = federationFacade.getPolicyConfiguration(queue);
} catch (YarnException e) {
LOG.debug(e);
}
// If there is no policy configured for this queue, fallback to the baseline
// policy that is configured either in the store or via XML config (and
// cached)
if (configuration == null) {
try {
LOG.warn("There is no policies configured for queue: " + queue + " we"
+ " fallback to default policy for: "
+ YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
queue = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
configuration = federationFacade.getPolicyConfiguration(
YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
} catch (YarnException e) {
// the fallback is not configure via store, but via XML, using
// previously loaded configuration.
configuration =
cachedConfs.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
}
}
// if the configuration has changed since last loaded, reinit the policy
// based on current configuration
if (!cachedConfs.containsKey(queue)
|| !cachedConfs.get(queue).equals(configuration)) {
singlePolicyReinit(policyMap, cachedConfs, queue, configuration);
}
FederationRouterPolicy policy = policyMap.get(queue);
if (policy == null) {
// this should never happen, as the to maps are updated together
throw new FederationPolicyException("No FederationRouterPolicy found "
+ "for queue: " + appSubmissionContext.getQueue() + " (for "
+ "application: " + appSubmissionContext.getApplicationId() + ") "
+ "and no default specified.");
}
return policy.getHomeSubcluster(appSubmissionContext);
}
/**
* This method reinitializes a policy and loads it in the policyMap.
*
* @param queue the queue to initialize a policy for.
* @param conf the configuration to use for initalization.
*
* @throws FederationPolicyInitializationException if initialization fails.
*/
private void singlePolicyReinit(Map<String, FederationRouterPolicy> policyMap,
Map<String, SubClusterPolicyConfiguration> cachedConfs, String queue,
SubClusterPolicyConfiguration conf)
throws FederationPolicyInitializationException {
FederationPolicyInitializationContext context =
new FederationPolicyInitializationContext(conf, subClusterResolver,
federationFacade, null);
String newType = context.getSubClusterPolicyConfiguration().getType();
FederationRouterPolicy routerPolicy = policyMap.get(queue);
FederationPolicyManager federationPolicyManager =
instantiatePolicyManager(newType);
// set queue, reinit policy if required (implementation lazily check
// content of conf), and cache it
federationPolicyManager.setQueue(queue);
routerPolicy =
federationPolicyManager.getRouterPolicy(context, routerPolicy);
// we need the two put to be atomic (across multiple threads invoking
// this and reset operations)
synchronized (this) {
policyMap.put(queue, routerPolicy);
cachedConfs.put(queue, conf);
}
}
private static FederationPolicyManager instantiatePolicyManager(
String newType) throws FederationPolicyInitializationException {
FederationPolicyManager federationPolicyManager = null;
try {
// create policy instance and set queue
Class c = Class.forName(newType);
federationPolicyManager = (FederationPolicyManager) c.newInstance();
} catch (ClassNotFoundException e) {
throw new FederationPolicyInitializationException(e);
} catch (InstantiationException e) {
throw new FederationPolicyInitializationException(e);
} catch (IllegalAccessException e) {
throw new FederationPolicyInitializationException(e);
}
return federationPolicyManager;
}
/**
* This method flushes all cached configurations and policies. This should be
* invoked if the facade remains activity after very large churn of queues in
* the system.
*/
public synchronized void reset() {
// remember the fallBack
SubClusterPolicyConfiguration conf =
globalConfMap.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
FederationRouterPolicy policy =
globalPolicyMap.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
globalConfMap = new ConcurrentHashMap<>();
globalPolicyMap = new ConcurrentHashMap<>();
// add to the cache a fallback with keyword null
globalConfMap.put(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY, conf);
globalPolicyMap.put(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY,
policy);
}
}

View File

@ -20,7 +20,7 @@ package org.apache.hadoop.yarn.server.federation.policies.dao;
import java.io.StringReader; import java.io.StringReader;
import java.io.StringWriter; import java.io.StringWriter;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.Charset; import java.nio.charset.StandardCharsets;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -100,7 +100,7 @@ public class WeightedPolicyInfo {
JSONUnmarshaller unmarshaller = jsonjaxbContext.createJSONUnmarshaller(); JSONUnmarshaller unmarshaller = jsonjaxbContext.createJSONUnmarshaller();
final byte[] bytes = new byte[bb.remaining()]; final byte[] bytes = new byte[bb.remaining()];
bb.get(bytes); bb.get(bytes);
String params = new String(bytes, Charset.forName("UTF-8")); String params = new String(bytes, StandardCharsets.UTF_8);
WeightedPolicyInfo weightedPolicyInfo = unmarshaller.unmarshalFromJSON( WeightedPolicyInfo weightedPolicyInfo = unmarshaller.unmarshalFromJSON(
new StringReader(params), WeightedPolicyInfo.class); new StringReader(params), WeightedPolicyInfo.class);
@ -164,7 +164,7 @@ public class WeightedPolicyInfo {
} }
try { try {
String s = toJSONString(); String s = toJSONString();
return ByteBuffer.wrap(s.getBytes(Charset.forName("UTF-8"))); return ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8));
} catch (JAXBException j) { } catch (JAXBException j) {
throw new FederationPolicyInitializationException(j); throw new FederationPolicyInitializationException(j);
} }

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoR
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@ -262,12 +263,17 @@ public final class FederationStateStoreFacade {
if (isCachingEnabled()) { if (isCachingEnabled()) {
return getPoliciesConfigurations().get(queue); return getPoliciesConfigurations().get(queue);
} else { } else {
return stateStore
.getPolicyConfiguration(
GetSubClusterPolicyConfigurationRequest.newInstance(queue))
.getPolicyConfiguration();
}
GetSubClusterPolicyConfigurationResponse response =
stateStore.getPolicyConfiguration(
GetSubClusterPolicyConfigurationRequest.newInstance(queue));
if (response == null) {
throw new YarnException("The stateStore returned a null for "
+ "GetSubClusterPolicyConfigurationResponse for queue " + queue);
} else {
return response.getPolicyConfiguration();
}
}
} }
/** /**

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.router.PriorityRouterPolicy;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Simple test of {@link PriorityBroadcastPolicyManager}.
*/
public class TestPriorityBroadcastPolicyManager extends BasePolicyManagerTest {
private WeightedPolicyInfo policyInfo;
@Before
public void setup() {
// configure a policy
wfp = new PriorityBroadcastPolicyManager();
wfp.setQueue("queue1");
SubClusterId sc1 = SubClusterId.newInstance("sc1");
SubClusterId sc2 = SubClusterId.newInstance("sc2");
policyInfo = new WeightedPolicyInfo();
Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
routerWeights.put(new SubClusterIdInfo(sc1), 0.2f);
routerWeights.put(new SubClusterIdInfo(sc2), 0.8f);
policyInfo.setRouterPolicyWeights(routerWeights);
((PriorityBroadcastPolicyManager) wfp).setWeightedPolicyInfo(policyInfo);
// set expected params that the base test class will use for tests
expectedPolicyManager = PriorityBroadcastPolicyManager.class;
expectedAMRMProxyPolicy = BroadcastAMRMProxyPolicy.class;
expectedRouterPolicy = PriorityRouterPolicy.class;
}
@Test
public void testPolicyInfoSetCorrectly() throws Exception {
serializeAndDeserializePolicyManager(wfp, expectedPolicyManager,
expectedAMRMProxyPolicy, expectedRouterPolicy);
// check the policyInfo propagates through ser/der correctly
Assert.assertEquals(
((PriorityBroadcastPolicyManager) wfp).getWeightedPolicyInfo(),
policyInfo);
}
}

View File

@ -0,0 +1,220 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.router.PriorityRouterPolicy;
import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Simple test of {@link RouterPolicyFacade}.
*/
public class TestRouterPolicyFacade {
private RouterPolicyFacade routerFacade;
private List<SubClusterId> subClusterIds;
private FederationStateStore store;
private String queue1 = "queue1";
private String defQueueKey = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
@Before
public void setup() throws YarnException {
// setting up a store and its facade (with caching off)
FederationStateStoreFacade fedFacade =
FederationStateStoreFacade.getInstance();
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, "0");
store = new MemoryFederationStateStore();
store.init(conf);
fedFacade.reinitialize(store, conf);
FederationStateStoreTestUtil storeTestUtil =
new FederationStateStoreTestUtil(store);
storeTestUtil.registerSubClusters(10);
subClusterIds = storeTestUtil.getAllSubClusterIds(true);
store.setPolicyConfiguration(SetSubClusterPolicyConfigurationRequest
.newInstance(getUniformPolicy(queue1)));
SubClusterResolver resolver = FederationPoliciesTestUtil.initResolver();
routerFacade = new RouterPolicyFacade(new YarnConfiguration(), fedFacade,
resolver, subClusterIds.get(0));
}
@Test
public void testConfigurationUpdate() throws YarnException {
// in this test we see what happens when the configuration is changed
// between calls. We achieve this by changing what is in the store.
ApplicationSubmissionContext applicationSubmissionContext =
mock(ApplicationSubmissionContext.class);
when(applicationSubmissionContext.getQueue()).thenReturn(queue1);
// first call runs using standard UniformRandomRouterPolicy
SubClusterId chosen =
routerFacade.getHomeSubcluster(applicationSubmissionContext);
Assert.assertTrue(subClusterIds.contains(chosen));
Assert.assertTrue(routerFacade.globalPolicyMap
.get(queue1) instanceof UniformRandomRouterPolicy);
// then the operator changes how queue1 is routed setting it to
// PriorityRouterPolicy with weights favoring the first subcluster in
// subClusterIds.
store.setPolicyConfiguration(SetSubClusterPolicyConfigurationRequest
.newInstance(getPriorityPolicy(queue1)));
// second call is routed by new policy PriorityRouterPolicy
chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext);
Assert.assertTrue(chosen.equals(subClusterIds.get(0)));
Assert.assertTrue(routerFacade.globalPolicyMap
.get(queue1) instanceof PriorityRouterPolicy);
}
@Test
public void testGetHomeSubcluster() throws YarnException {
ApplicationSubmissionContext applicationSubmissionContext =
mock(ApplicationSubmissionContext.class);
when(applicationSubmissionContext.getQueue()).thenReturn(queue1);
// the facade only contains the fallback behavior
Assert.assertTrue(routerFacade.globalPolicyMap.containsKey(defQueueKey)
&& routerFacade.globalPolicyMap.size() == 1);
// when invoked it returns the expected SubClusterId.
SubClusterId chosen =
routerFacade.getHomeSubcluster(applicationSubmissionContext);
Assert.assertTrue(subClusterIds.contains(chosen));
// now the caching of policies must have added an entry for this queue
Assert.assertTrue(routerFacade.globalPolicyMap.size() == 2);
// after the facade is used the policyMap contains the expected policy type.
Assert.assertTrue(routerFacade.globalPolicyMap
.get(queue1) instanceof UniformRandomRouterPolicy);
// the facade is again empty after reset
routerFacade.reset();
// the facade only contains the fallback behavior
Assert.assertTrue(routerFacade.globalPolicyMap.containsKey(defQueueKey)
&& routerFacade.globalPolicyMap.size() == 1);
}
@Test
public void testFallbacks() throws YarnException {
// this tests the behavior of the system when the queue requested is
// not configured (or null) and there is no default policy configured
// for DEFAULT_FEDERATION_POLICY_KEY (*). This is our second line of
// defense.
ApplicationSubmissionContext applicationSubmissionContext =
mock(ApplicationSubmissionContext.class);
// The facade answers also for non-initialized policies (using the
// defaultPolicy)
String uninitQueue = "non-initialized-queue";
when(applicationSubmissionContext.getQueue()).thenReturn(uninitQueue);
SubClusterId chosen =
routerFacade.getHomeSubcluster(applicationSubmissionContext);
Assert.assertTrue(subClusterIds.contains(chosen));
Assert.assertFalse(routerFacade.globalPolicyMap.containsKey(uninitQueue));
// empty string
when(applicationSubmissionContext.getQueue()).thenReturn("");
chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext);
Assert.assertTrue(subClusterIds.contains(chosen));
Assert.assertFalse(routerFacade.globalPolicyMap.containsKey(uninitQueue));
// null queue also falls back to default
when(applicationSubmissionContext.getQueue()).thenReturn(null);
chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext);
Assert.assertTrue(subClusterIds.contains(chosen));
Assert.assertFalse(routerFacade.globalPolicyMap.containsKey(uninitQueue));
}
public static SubClusterPolicyConfiguration getUniformPolicy(String queue)
throws FederationPolicyInitializationException {
// we go through standard lifecycle instantiating a policyManager and
// configuring it and serializing it to a conf.
UniformBroadcastPolicyManager wfp = new UniformBroadcastPolicyManager();
wfp.setQueue(queue);
SubClusterPolicyConfiguration fpc = wfp.serializeConf();
return fpc;
}
public SubClusterPolicyConfiguration getPriorityPolicy(String queue)
throws FederationPolicyInitializationException {
// we go through standard lifecycle instantiating a policyManager and
// configuring it and serializing it to a conf.
PriorityBroadcastPolicyManager wfp = new PriorityBroadcastPolicyManager();
// equal weight to all subcluster
Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
for (SubClusterId s : subClusterIds) {
routerWeights.put(new SubClusterIdInfo(s), 0.9f / subClusterIds.size());
}
// beside the first one who gets more weight
SubClusterIdInfo favorite = new SubClusterIdInfo((subClusterIds.get(0)));
routerWeights.put(favorite, (0.1f + 0.9f / subClusterIds.size()));
WeightedPolicyInfo policyInfo = new WeightedPolicyInfo();
policyInfo.setRouterPolicyWeights(routerWeights);
wfp.setWeightedPolicyInfo(policyInfo);
wfp.setQueue(queue);
// serializeConf it in a context
SubClusterPolicyConfiguration fpc = wfp.serializeConf();
return fpc;
}
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.federation.utils; package org.apache.hadoop.yarn.server.federation.utils;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
@ -29,6 +31,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHome
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest; import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@ -62,8 +65,8 @@ public class FederationStateStoreTestUtil {
String webAppAddress = "1.2.3.4:4"; String webAppAddress = "1.2.3.4:4";
return SubClusterInfo.newInstance(subClusterId, amRMAddress, return SubClusterInfo.newInstance(subClusterId, amRMAddress,
clientRMAddress, rmAdminAddress, webAppAddress, SubClusterState.SC_NEW, clientRMAddress, rmAdminAddress, webAppAddress,
CLOCK.getTime(), "capability"); SubClusterState.SC_RUNNING, CLOCK.getTime(), "capability");
} }
private void registerSubCluster(SubClusterId subClusterId) private void registerSubCluster(SubClusterId subClusterId)
@ -97,6 +100,21 @@ public class FederationStateStoreTestUtil {
} }
} }
public List<SubClusterId> getAllSubClusterIds(
boolean filterInactiveSubclusters) throws YarnException {
List<SubClusterInfo> infos = stateStore
.getSubClusters(
GetSubClustersInfoRequest.newInstance(filterInactiveSubclusters))
.getSubClusters();
List<SubClusterId> ids = new ArrayList<>();
for (SubClusterInfo s : infos) {
ids.add(s.getSubClusterId());
}
return ids;
}
private SubClusterPolicyConfiguration createSCPolicyConf(String queueName, private SubClusterPolicyConfiguration createSCPolicyConf(String queueName,
String policyType) { String policyType) {
return SubClusterPolicyConfiguration.newInstance(queueName, policyType, return SubClusterPolicyConfiguration.newInstance(queueName, policyType,