YARN-5391. PolicyManager to tie together Router/AMRM Federation policies. (Carlo Curino via Subru).
(cherry picked from commit 20893682eced98dfba55d88edd63296993087c85) (cherry picked from commit 93d9fdeca65a97434936ec9c2e25c362ee016783)
This commit is contained in:
parent
5c5ab53f3d
commit
b3a1ab711c
@ -0,0 +1,175 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.federation.policies;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class provides basic implementation for common methods that multiple
|
||||||
|
* policies will need to implement.
|
||||||
|
*/
|
||||||
|
public abstract class AbstractPolicyManager implements
|
||||||
|
FederationPolicyManager {
|
||||||
|
|
||||||
|
private String queue;
|
||||||
|
@SuppressWarnings("checkstyle:visibilitymodifier")
|
||||||
|
protected Class routerFederationPolicy;
|
||||||
|
@SuppressWarnings("checkstyle:visibilitymodifier")
|
||||||
|
protected Class amrmProxyFederationPolicy;
|
||||||
|
|
||||||
|
public static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(AbstractPolicyManager.class);
|
||||||
|
/**
|
||||||
|
* This default implementation validates the
|
||||||
|
* {@link FederationPolicyInitializationContext},
|
||||||
|
* then checks whether it needs to reinstantiate the class (null or
|
||||||
|
* mismatching type), and reinitialize the policy.
|
||||||
|
*
|
||||||
|
* @param federationPolicyContext the current context
|
||||||
|
* @param oldInstance the existing (possibly null) instance.
|
||||||
|
*
|
||||||
|
* @return a valid and fully reinitalized {@link FederationAMRMProxyPolicy}
|
||||||
|
* instance
|
||||||
|
*
|
||||||
|
* @throws FederationPolicyInitializationException if the reinitalization is
|
||||||
|
* not valid, and ensure
|
||||||
|
* previous state is preserved
|
||||||
|
*/
|
||||||
|
public FederationAMRMProxyPolicy getAMRMPolicy(
|
||||||
|
FederationPolicyInitializationContext federationPolicyContext,
|
||||||
|
FederationAMRMProxyPolicy oldInstance)
|
||||||
|
throws FederationPolicyInitializationException {
|
||||||
|
|
||||||
|
if (amrmProxyFederationPolicy == null) {
|
||||||
|
throw new FederationPolicyInitializationException("The parameter "
|
||||||
|
+ "amrmProxyFederationPolicy should be initialized in "
|
||||||
|
+ this.getClass().getSimpleName() + " constructor.");
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
return (FederationAMRMProxyPolicy) internalPolicyGetter(
|
||||||
|
federationPolicyContext, oldInstance, amrmProxyFederationPolicy);
|
||||||
|
} catch (ClassCastException e) {
|
||||||
|
throw new FederationPolicyInitializationException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This default implementation validates the
|
||||||
|
* {@link FederationPolicyInitializationContext},
|
||||||
|
* then checks whether it needs to reinstantiate the class (null or
|
||||||
|
* mismatching type), and reinitialize the policy.
|
||||||
|
*
|
||||||
|
* @param federationPolicyContext the current context
|
||||||
|
* @param oldInstance the existing (possibly null) instance.
|
||||||
|
*
|
||||||
|
* @return a valid and fully reinitalized {@link FederationRouterPolicy}
|
||||||
|
* instance
|
||||||
|
*
|
||||||
|
* @throws FederationPolicyInitializationException if the reinitalization is
|
||||||
|
* not valid, and ensure
|
||||||
|
* previous state is preserved
|
||||||
|
*/
|
||||||
|
|
||||||
|
public FederationRouterPolicy getRouterPolicy(
|
||||||
|
FederationPolicyInitializationContext federationPolicyContext,
|
||||||
|
FederationRouterPolicy oldInstance)
|
||||||
|
throws FederationPolicyInitializationException {
|
||||||
|
|
||||||
|
//checks that sub-types properly initialize the types of policies
|
||||||
|
if (routerFederationPolicy == null) {
|
||||||
|
throw new FederationPolicyInitializationException("The policy "
|
||||||
|
+ "type should be initialized in " + this.getClass().getSimpleName()
|
||||||
|
+ " constructor.");
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
return (FederationRouterPolicy) internalPolicyGetter(
|
||||||
|
federationPolicyContext, oldInstance, routerFederationPolicy);
|
||||||
|
} catch (ClassCastException e) {
|
||||||
|
throw new FederationPolicyInitializationException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getQueue() {
|
||||||
|
return queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setQueue(String queue) {
|
||||||
|
this.queue = queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Common functionality to instantiate a reinitialize a {@link
|
||||||
|
* ConfigurableFederationPolicy}.
|
||||||
|
*/
|
||||||
|
private ConfigurableFederationPolicy internalPolicyGetter(
|
||||||
|
final FederationPolicyInitializationContext federationPolicyContext,
|
||||||
|
ConfigurableFederationPolicy oldInstance, Class policy)
|
||||||
|
throws FederationPolicyInitializationException {
|
||||||
|
|
||||||
|
FederationPolicyInitializationContextValidator
|
||||||
|
.validate(federationPolicyContext, this.getClass().getCanonicalName());
|
||||||
|
|
||||||
|
if (oldInstance == null || !oldInstance.getClass().equals(policy)) {
|
||||||
|
try {
|
||||||
|
oldInstance = (ConfigurableFederationPolicy) policy.newInstance();
|
||||||
|
} catch (InstantiationException e) {
|
||||||
|
throw new FederationPolicyInitializationException(e);
|
||||||
|
} catch (IllegalAccessException e) {
|
||||||
|
throw new FederationPolicyInitializationException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//copying the context to avoid side-effects
|
||||||
|
FederationPolicyInitializationContext modifiedContext =
|
||||||
|
updateContext(federationPolicyContext,
|
||||||
|
oldInstance.getClass().getCanonicalName());
|
||||||
|
|
||||||
|
oldInstance.reinitialize(modifiedContext);
|
||||||
|
return oldInstance;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method is used to copy-on-write the context, that will be passed
|
||||||
|
* downstream to the router/amrmproxy policies.
|
||||||
|
*/
|
||||||
|
private FederationPolicyInitializationContext updateContext(
|
||||||
|
FederationPolicyInitializationContext federationPolicyContext,
|
||||||
|
String type) {
|
||||||
|
// copying configuration and context to avoid modification of original
|
||||||
|
SubClusterPolicyConfiguration newConf = SubClusterPolicyConfiguration
|
||||||
|
.newInstance(federationPolicyContext
|
||||||
|
.getSubClusterPolicyConfiguration());
|
||||||
|
newConf.setType(type);
|
||||||
|
|
||||||
|
return new FederationPolicyInitializationContext(newConf,
|
||||||
|
federationPolicyContext.getFederationSubclusterResolver(),
|
||||||
|
federationPolicyContext.getFederationStateStoreFacade(),
|
||||||
|
federationPolicyContext.getHomeSubcluster());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -41,10 +41,11 @@ public FederationPolicyInitializationContext() {
|
|||||||
|
|
||||||
public FederationPolicyInitializationContext(
|
public FederationPolicyInitializationContext(
|
||||||
SubClusterPolicyConfiguration policy, SubClusterResolver resolver,
|
SubClusterPolicyConfiguration policy, SubClusterResolver resolver,
|
||||||
FederationStateStoreFacade storeFacade) {
|
FederationStateStoreFacade storeFacade, SubClusterId home) {
|
||||||
this.federationPolicyConfiguration = policy;
|
this.federationPolicyConfiguration = policy;
|
||||||
this.federationSubclusterResolver = resolver;
|
this.federationSubclusterResolver = resolver;
|
||||||
this.federationStateStoreFacade = storeFacade;
|
this.federationStateStoreFacade = storeFacade;
|
||||||
|
this.homeSubcluster = home;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -0,0 +1,56 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.federation.policies;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class represents a simple implementation of a {@code
|
||||||
|
* FederationPolicyManager}.
|
||||||
|
*
|
||||||
|
* It combines the basic policies: {@link UniformRandomRouterPolicy} and
|
||||||
|
* {@link BroadcastAMRMProxyPolicy}, which are designed to work together and
|
||||||
|
* "spread" the load among sub-clusters uniformly.
|
||||||
|
*
|
||||||
|
* This simple policy might impose heavy load on the RMs and return more
|
||||||
|
* containers than a job requested as all requests are (replicated and)
|
||||||
|
* broadcasted.
|
||||||
|
*/
|
||||||
|
public class UniformBroadcastPolicyManager
|
||||||
|
extends AbstractPolicyManager {
|
||||||
|
|
||||||
|
public UniformBroadcastPolicyManager() {
|
||||||
|
//this structurally hard-codes two compatible policies for Router and
|
||||||
|
// AMRMProxy.
|
||||||
|
routerFederationPolicy = UniformRandomRouterPolicy.class;
|
||||||
|
amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SubClusterPolicyConfiguration serializeConf()
|
||||||
|
throws FederationPolicyInitializationException {
|
||||||
|
ByteBuffer buf = ByteBuffer.allocate(0);
|
||||||
|
return SubClusterPolicyConfiguration
|
||||||
|
.newInstance(getQueue(), this.getClass().getCanonicalName(), buf);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,67 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.federation.policies;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.router.WeightedRandomRouterPolicy;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Policy that allows operator to configure "weights" for routing. This picks a
|
||||||
|
* {@link WeightedRandomRouterPolicy} for the router and a {@link
|
||||||
|
* LocalityMulticastAMRMProxyPolicy} for the amrmproxy as they are designed to
|
||||||
|
* work together.
|
||||||
|
*/
|
||||||
|
public class WeightedLocalityPolicyManager
|
||||||
|
extends AbstractPolicyManager {
|
||||||
|
|
||||||
|
private WeightedPolicyInfo weightedPolicyInfo;
|
||||||
|
|
||||||
|
public WeightedLocalityPolicyManager() {
|
||||||
|
//this structurally hard-codes two compatible policies for Router and
|
||||||
|
// AMRMProxy.
|
||||||
|
routerFederationPolicy = WeightedRandomRouterPolicy.class;
|
||||||
|
amrmProxyFederationPolicy = LocalityMulticastAMRMProxyPolicy.class;
|
||||||
|
weightedPolicyInfo = new WeightedPolicyInfo();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SubClusterPolicyConfiguration serializeConf()
|
||||||
|
throws FederationPolicyInitializationException {
|
||||||
|
ByteBuffer buf = weightedPolicyInfo.toByteBuffer();
|
||||||
|
return SubClusterPolicyConfiguration
|
||||||
|
.newInstance(getQueue(), this.getClass().getCanonicalName(), buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public WeightedPolicyInfo getWeightedPolicyInfo() {
|
||||||
|
return weightedPolicyInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void setWeightedPolicyInfo(
|
||||||
|
WeightedPolicyInfo weightedPolicyInfo) {
|
||||||
|
this.weightedPolicyInfo = weightedPolicyInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -40,6 +40,7 @@
|
|||||||
@Unstable
|
@Unstable
|
||||||
public abstract class SubClusterPolicyConfiguration {
|
public abstract class SubClusterPolicyConfiguration {
|
||||||
|
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
public static SubClusterPolicyConfiguration newInstance(String queue,
|
public static SubClusterPolicyConfiguration newInstance(String queue,
|
||||||
@ -52,6 +53,18 @@ public static SubClusterPolicyConfiguration newInstance(String queue,
|
|||||||
return policy;
|
return policy;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Private
|
||||||
|
@Unstable
|
||||||
|
public static SubClusterPolicyConfiguration newInstance(
|
||||||
|
SubClusterPolicyConfiguration conf) {
|
||||||
|
SubClusterPolicyConfiguration policy =
|
||||||
|
Records.newRecord(SubClusterPolicyConfiguration.class);
|
||||||
|
policy.setQueue(conf.getQueue());
|
||||||
|
policy.setType(conf.getType());
|
||||||
|
policy.setParams(conf.getParams());
|
||||||
|
return policy;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the name of the queue for which we are configuring a policy.
|
* Get the name of the queue for which we are configuring a policy.
|
||||||
*
|
*
|
||||||
|
@ -0,0 +1,108 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.federation.policies;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class provides common test methods for testing {@code
|
||||||
|
* FederationPolicyManager}s.
|
||||||
|
*/
|
||||||
|
public abstract class BasePolicyManagerTest {
|
||||||
|
|
||||||
|
|
||||||
|
@SuppressWarnings("checkstyle:visibilitymodifier")
|
||||||
|
protected FederationPolicyManager wfp = null;
|
||||||
|
@SuppressWarnings("checkstyle:visibilitymodifier")
|
||||||
|
protected Class expectedPolicyManager;
|
||||||
|
@SuppressWarnings("checkstyle:visibilitymodifier")
|
||||||
|
protected Class expectedAMRMProxyPolicy;
|
||||||
|
@SuppressWarnings("checkstyle:visibilitymodifier")
|
||||||
|
protected Class expectedRouterPolicy;
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSerializeAndInstantiate() throws Exception {
|
||||||
|
serializeAndDeserializePolicyManager(wfp, expectedPolicyManager,
|
||||||
|
expectedAMRMProxyPolicy,
|
||||||
|
expectedRouterPolicy);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = FederationPolicyInitializationException.class)
|
||||||
|
public void testSerializeAndInstantiateBad1() throws Exception {
|
||||||
|
serializeAndDeserializePolicyManager(wfp, String.class,
|
||||||
|
expectedAMRMProxyPolicy, expectedRouterPolicy);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = AssertionError.class)
|
||||||
|
public void testSerializeAndInstantiateBad2() throws Exception {
|
||||||
|
serializeAndDeserializePolicyManager(wfp, expectedPolicyManager,
|
||||||
|
String.class, expectedRouterPolicy);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = AssertionError.class)
|
||||||
|
public void testSerializeAndInstantiateBad3() throws Exception {
|
||||||
|
serializeAndDeserializePolicyManager(wfp, expectedPolicyManager,
|
||||||
|
expectedAMRMProxyPolicy, String.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static void serializeAndDeserializePolicyManager(
|
||||||
|
FederationPolicyManager wfp, Class policyManagerType,
|
||||||
|
Class expAMRMProxyPolicy, Class expRouterPolicy) throws Exception {
|
||||||
|
|
||||||
|
// serializeConf it in a context
|
||||||
|
SubClusterPolicyConfiguration fpc =
|
||||||
|
wfp.serializeConf();
|
||||||
|
fpc.setType(policyManagerType.getCanonicalName());
|
||||||
|
FederationPolicyInitializationContext context = new
|
||||||
|
FederationPolicyInitializationContext();
|
||||||
|
context.setSubClusterPolicyConfiguration(fpc);
|
||||||
|
context
|
||||||
|
.setFederationStateStoreFacade(FederationPoliciesTestUtil.initFacade());
|
||||||
|
context.setFederationSubclusterResolver(
|
||||||
|
FederationPoliciesTestUtil.initResolver());
|
||||||
|
context.setHomeSubcluster(SubClusterId.newInstance("homesubcluster"));
|
||||||
|
|
||||||
|
// based on the "context" created instantiate new class and use it
|
||||||
|
Class c = Class.forName(wfp.getClass().getCanonicalName());
|
||||||
|
FederationPolicyManager wfp2 = (FederationPolicyManager) c.newInstance();
|
||||||
|
|
||||||
|
FederationAMRMProxyPolicy federationAMRMProxyPolicy =
|
||||||
|
wfp2.getAMRMPolicy(context, null);
|
||||||
|
|
||||||
|
//needed only for tests (getARMRMPolicy change the "type" in conf)
|
||||||
|
fpc.setType(wfp.getClass().getCanonicalName());
|
||||||
|
|
||||||
|
FederationRouterPolicy federationRouterPolicy =
|
||||||
|
wfp2.getRouterPolicy(context, null);
|
||||||
|
|
||||||
|
Assert.assertEquals(federationAMRMProxyPolicy.getClass(),
|
||||||
|
expAMRMProxyPolicy);
|
||||||
|
|
||||||
|
Assert.assertEquals(federationRouterPolicy.getClass(),
|
||||||
|
expRouterPolicy);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -24,6 +24,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
|
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
|
||||||
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
|
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
|
||||||
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
|
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
|
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
|
||||||
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
||||||
@ -38,6 +39,7 @@ public class TestFederationPolicyInitializationContextValidator {
|
|||||||
private SubClusterPolicyConfiguration goodConfig;
|
private SubClusterPolicyConfiguration goodConfig;
|
||||||
private SubClusterResolver goodSR;
|
private SubClusterResolver goodSR;
|
||||||
private FederationStateStoreFacade goodFacade;
|
private FederationStateStoreFacade goodFacade;
|
||||||
|
private SubClusterId goodHome;
|
||||||
private FederationPolicyInitializationContext context;
|
private FederationPolicyInitializationContext context;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
@ -45,8 +47,9 @@ public void setUp() throws Exception {
|
|||||||
goodFacade = FederationPoliciesTestUtil.initFacade();
|
goodFacade = FederationPoliciesTestUtil.initFacade();
|
||||||
goodConfig = new MockPolicyManager().serializeConf();
|
goodConfig = new MockPolicyManager().serializeConf();
|
||||||
goodSR = FederationPoliciesTestUtil.initResolver();
|
goodSR = FederationPoliciesTestUtil.initResolver();
|
||||||
|
goodHome = SubClusterId.newInstance("homesubcluster");
|
||||||
context = new FederationPolicyInitializationContext(goodConfig, goodSR,
|
context = new FederationPolicyInitializationContext(goodConfig, goodSR,
|
||||||
goodFacade);
|
goodFacade, goodHome);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -0,0 +1,40 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.federation.policies;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy;
|
||||||
|
import org.junit.Before;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simple test of {@link UniformBroadcastPolicyManager}.
|
||||||
|
*/
|
||||||
|
public class TestUniformBroadcastPolicyManager extends BasePolicyManagerTest {
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
//config policy
|
||||||
|
wfp = new UniformBroadcastPolicyManager();
|
||||||
|
wfp.setQueue("queue1");
|
||||||
|
|
||||||
|
//set expected params that the base test class will use for tests
|
||||||
|
expectedPolicyManager = UniformBroadcastPolicyManager.class;
|
||||||
|
expectedAMRMProxyPolicy = BroadcastAMRMProxyPolicy.class;
|
||||||
|
expectedRouterPolicy = UniformRandomRouterPolicy.class;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,79 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.federation.policies;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.router.WeightedRandomRouterPolicy;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simple test of {@link WeightedLocalityPolicyManager}.
|
||||||
|
*/
|
||||||
|
public class TestWeightedLocalityPolicyManager extends
|
||||||
|
BasePolicyManagerTest {
|
||||||
|
|
||||||
|
private WeightedPolicyInfo policyInfo;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
// configure a policy
|
||||||
|
|
||||||
|
wfp = new WeightedLocalityPolicyManager();
|
||||||
|
wfp.setQueue("queue1");
|
||||||
|
SubClusterId sc1 = SubClusterId.newInstance("sc1");
|
||||||
|
SubClusterId sc2 = SubClusterId.newInstance("sc2");
|
||||||
|
policyInfo = new WeightedPolicyInfo();
|
||||||
|
|
||||||
|
Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
|
||||||
|
routerWeights.put(new SubClusterIdInfo(sc1), 0.2f);
|
||||||
|
routerWeights.put(new SubClusterIdInfo(sc2), 0.8f);
|
||||||
|
policyInfo.setRouterPolicyWeights(routerWeights);
|
||||||
|
|
||||||
|
Map<SubClusterIdInfo, Float> amrmWeights = new HashMap<>();
|
||||||
|
amrmWeights.put(new SubClusterIdInfo(sc1), 0.2f);
|
||||||
|
amrmWeights.put(new SubClusterIdInfo(sc2), 0.8f);
|
||||||
|
policyInfo.setAMRMPolicyWeights(amrmWeights);
|
||||||
|
|
||||||
|
((WeightedLocalityPolicyManager) wfp).setWeightedPolicyInfo(
|
||||||
|
policyInfo);
|
||||||
|
|
||||||
|
//set expected params that the base test class will use for tests
|
||||||
|
expectedPolicyManager = WeightedLocalityPolicyManager.class;
|
||||||
|
expectedAMRMProxyPolicy = LocalityMulticastAMRMProxyPolicy.class;
|
||||||
|
expectedRouterPolicy = WeightedRandomRouterPolicy.class;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPolicyInfoSetCorrectly() throws Exception {
|
||||||
|
serializeAndDeserializePolicyManager(wfp, expectedPolicyManager,
|
||||||
|
expectedAMRMProxyPolicy,
|
||||||
|
expectedRouterPolicy);
|
||||||
|
|
||||||
|
//check the policyInfo propagates through ser/der correctly
|
||||||
|
Assert.assertEquals(((WeightedLocalityPolicyManager) wfp)
|
||||||
|
.getWeightedPolicyInfo(), policyInfo);
|
||||||
|
}
|
||||||
|
}
|
@ -143,7 +143,7 @@ public static void initializePolicyContext(
|
|||||||
SubClusterInfo> activeSubclusters) throws YarnException {
|
SubClusterInfo> activeSubclusters) throws YarnException {
|
||||||
FederationPolicyInitializationContext context =
|
FederationPolicyInitializationContext context =
|
||||||
new FederationPolicyInitializationContext(null, initResolver(),
|
new FederationPolicyInitializationContext(null, initResolver(),
|
||||||
initFacade());
|
initFacade(), SubClusterId.newInstance("homesubcluster"));
|
||||||
initializePolicyContext(context, policy, policyInfo, activeSubclusters);
|
initializePolicyContext(context, policy, policyInfo, activeSubclusters);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user