YARN-5391. PolicyManager to tie together Router/AMRM Federation policies. (Carlo Curino via Subru).
This commit is contained in:
parent
11c5336522
commit
20893682ec
|
@ -0,0 +1,175 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.federation.policies;
|
||||
|
||||
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* This class provides basic implementation for common methods that multiple
|
||||
* policies will need to implement.
|
||||
*/
|
||||
public abstract class AbstractPolicyManager implements
|
||||
FederationPolicyManager {
|
||||
|
||||
private String queue;
|
||||
@SuppressWarnings("checkstyle:visibilitymodifier")
|
||||
protected Class routerFederationPolicy;
|
||||
@SuppressWarnings("checkstyle:visibilitymodifier")
|
||||
protected Class amrmProxyFederationPolicy;
|
||||
|
||||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(AbstractPolicyManager.class);
|
||||
/**
|
||||
* This default implementation validates the
|
||||
* {@link FederationPolicyInitializationContext},
|
||||
* then checks whether it needs to reinstantiate the class (null or
|
||||
* mismatching type), and reinitialize the policy.
|
||||
*
|
||||
* @param federationPolicyContext the current context
|
||||
* @param oldInstance the existing (possibly null) instance.
|
||||
*
|
||||
* @return a valid and fully reinitalized {@link FederationAMRMProxyPolicy}
|
||||
* instance
|
||||
*
|
||||
* @throws FederationPolicyInitializationException if the reinitalization is
|
||||
* not valid, and ensure
|
||||
* previous state is preserved
|
||||
*/
|
||||
public FederationAMRMProxyPolicy getAMRMPolicy(
|
||||
FederationPolicyInitializationContext federationPolicyContext,
|
||||
FederationAMRMProxyPolicy oldInstance)
|
||||
throws FederationPolicyInitializationException {
|
||||
|
||||
if (amrmProxyFederationPolicy == null) {
|
||||
throw new FederationPolicyInitializationException("The parameter "
|
||||
+ "amrmProxyFederationPolicy should be initialized in "
|
||||
+ this.getClass().getSimpleName() + " constructor.");
|
||||
}
|
||||
|
||||
try {
|
||||
return (FederationAMRMProxyPolicy) internalPolicyGetter(
|
||||
federationPolicyContext, oldInstance, amrmProxyFederationPolicy);
|
||||
} catch (ClassCastException e) {
|
||||
throw new FederationPolicyInitializationException(e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* This default implementation validates the
|
||||
* {@link FederationPolicyInitializationContext},
|
||||
* then checks whether it needs to reinstantiate the class (null or
|
||||
* mismatching type), and reinitialize the policy.
|
||||
*
|
||||
* @param federationPolicyContext the current context
|
||||
* @param oldInstance the existing (possibly null) instance.
|
||||
*
|
||||
* @return a valid and fully reinitalized {@link FederationRouterPolicy}
|
||||
* instance
|
||||
*
|
||||
* @throws FederationPolicyInitializationException if the reinitalization is
|
||||
* not valid, and ensure
|
||||
* previous state is preserved
|
||||
*/
|
||||
|
||||
public FederationRouterPolicy getRouterPolicy(
|
||||
FederationPolicyInitializationContext federationPolicyContext,
|
||||
FederationRouterPolicy oldInstance)
|
||||
throws FederationPolicyInitializationException {
|
||||
|
||||
//checks that sub-types properly initialize the types of policies
|
||||
if (routerFederationPolicy == null) {
|
||||
throw new FederationPolicyInitializationException("The policy "
|
||||
+ "type should be initialized in " + this.getClass().getSimpleName()
|
||||
+ " constructor.");
|
||||
}
|
||||
|
||||
try {
|
||||
return (FederationRouterPolicy) internalPolicyGetter(
|
||||
federationPolicyContext, oldInstance, routerFederationPolicy);
|
||||
} catch (ClassCastException e) {
|
||||
throw new FederationPolicyInitializationException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getQueue() {
|
||||
return queue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setQueue(String queue) {
|
||||
this.queue = queue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Common functionality to instantiate a reinitialize a {@link
|
||||
* ConfigurableFederationPolicy}.
|
||||
*/
|
||||
private ConfigurableFederationPolicy internalPolicyGetter(
|
||||
final FederationPolicyInitializationContext federationPolicyContext,
|
||||
ConfigurableFederationPolicy oldInstance, Class policy)
|
||||
throws FederationPolicyInitializationException {
|
||||
|
||||
FederationPolicyInitializationContextValidator
|
||||
.validate(federationPolicyContext, this.getClass().getCanonicalName());
|
||||
|
||||
if (oldInstance == null || !oldInstance.getClass().equals(policy)) {
|
||||
try {
|
||||
oldInstance = (ConfigurableFederationPolicy) policy.newInstance();
|
||||
} catch (InstantiationException e) {
|
||||
throw new FederationPolicyInitializationException(e);
|
||||
} catch (IllegalAccessException e) {
|
||||
throw new FederationPolicyInitializationException(e);
|
||||
}
|
||||
}
|
||||
|
||||
//copying the context to avoid side-effects
|
||||
FederationPolicyInitializationContext modifiedContext =
|
||||
updateContext(federationPolicyContext,
|
||||
oldInstance.getClass().getCanonicalName());
|
||||
|
||||
oldInstance.reinitialize(modifiedContext);
|
||||
return oldInstance;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is used to copy-on-write the context, that will be passed
|
||||
* downstream to the router/amrmproxy policies.
|
||||
*/
|
||||
private FederationPolicyInitializationContext updateContext(
|
||||
FederationPolicyInitializationContext federationPolicyContext,
|
||||
String type) {
|
||||
// copying configuration and context to avoid modification of original
|
||||
SubClusterPolicyConfiguration newConf = SubClusterPolicyConfiguration
|
||||
.newInstance(federationPolicyContext
|
||||
.getSubClusterPolicyConfiguration());
|
||||
newConf.setType(type);
|
||||
|
||||
return new FederationPolicyInitializationContext(newConf,
|
||||
federationPolicyContext.getFederationSubclusterResolver(),
|
||||
federationPolicyContext.getFederationStateStoreFacade(),
|
||||
federationPolicyContext.getHomeSubcluster());
|
||||
}
|
||||
|
||||
}
|
|
@ -41,10 +41,11 @@ public class FederationPolicyInitializationContext {
|
|||
|
||||
public FederationPolicyInitializationContext(
|
||||
SubClusterPolicyConfiguration policy, SubClusterResolver resolver,
|
||||
FederationStateStoreFacade storeFacade) {
|
||||
FederationStateStoreFacade storeFacade, SubClusterId home) {
|
||||
this.federationPolicyConfiguration = policy;
|
||||
this.federationSubclusterResolver = resolver;
|
||||
this.federationStateStoreFacade = storeFacade;
|
||||
this.homeSubcluster = home;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.federation.policies;
|
||||
|
||||
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
/**
|
||||
* This class represents a simple implementation of a {@code
|
||||
* FederationPolicyManager}.
|
||||
*
|
||||
* It combines the basic policies: {@link UniformRandomRouterPolicy} and
|
||||
* {@link BroadcastAMRMProxyPolicy}, which are designed to work together and
|
||||
* "spread" the load among sub-clusters uniformly.
|
||||
*
|
||||
* This simple policy might impose heavy load on the RMs and return more
|
||||
* containers than a job requested as all requests are (replicated and)
|
||||
* broadcasted.
|
||||
*/
|
||||
public class UniformBroadcastPolicyManager
|
||||
extends AbstractPolicyManager {
|
||||
|
||||
public UniformBroadcastPolicyManager() {
|
||||
//this structurally hard-codes two compatible policies for Router and
|
||||
// AMRMProxy.
|
||||
routerFederationPolicy = UniformRandomRouterPolicy.class;
|
||||
amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SubClusterPolicyConfiguration serializeConf()
|
||||
throws FederationPolicyInitializationException {
|
||||
ByteBuffer buf = ByteBuffer.allocate(0);
|
||||
return SubClusterPolicyConfiguration
|
||||
.newInstance(getQueue(), this.getClass().getCanonicalName(), buf);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.federation.policies;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.router.WeightedRandomRouterPolicy;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
/**
|
||||
* Policy that allows operator to configure "weights" for routing. This picks a
|
||||
* {@link WeightedRandomRouterPolicy} for the router and a {@link
|
||||
* LocalityMulticastAMRMProxyPolicy} for the amrmproxy as they are designed to
|
||||
* work together.
|
||||
*/
|
||||
public class WeightedLocalityPolicyManager
|
||||
extends AbstractPolicyManager {
|
||||
|
||||
private WeightedPolicyInfo weightedPolicyInfo;
|
||||
|
||||
public WeightedLocalityPolicyManager() {
|
||||
//this structurally hard-codes two compatible policies for Router and
|
||||
// AMRMProxy.
|
||||
routerFederationPolicy = WeightedRandomRouterPolicy.class;
|
||||
amrmProxyFederationPolicy = LocalityMulticastAMRMProxyPolicy.class;
|
||||
weightedPolicyInfo = new WeightedPolicyInfo();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SubClusterPolicyConfiguration serializeConf()
|
||||
throws FederationPolicyInitializationException {
|
||||
ByteBuffer buf = weightedPolicyInfo.toByteBuffer();
|
||||
return SubClusterPolicyConfiguration
|
||||
.newInstance(getQueue(), this.getClass().getCanonicalName(), buf);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public WeightedPolicyInfo getWeightedPolicyInfo() {
|
||||
return weightedPolicyInfo;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setWeightedPolicyInfo(
|
||||
WeightedPolicyInfo weightedPolicyInfo) {
|
||||
this.weightedPolicyInfo = weightedPolicyInfo;
|
||||
}
|
||||
|
||||
}
|
|
@ -40,6 +40,7 @@ import java.nio.ByteBuffer;
|
|||
@Unstable
|
||||
public abstract class SubClusterPolicyConfiguration {
|
||||
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public static SubClusterPolicyConfiguration newInstance(String queue,
|
||||
|
@ -52,6 +53,18 @@ public abstract class SubClusterPolicyConfiguration {
|
|||
return policy;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public static SubClusterPolicyConfiguration newInstance(
|
||||
SubClusterPolicyConfiguration conf) {
|
||||
SubClusterPolicyConfiguration policy =
|
||||
Records.newRecord(SubClusterPolicyConfiguration.class);
|
||||
policy.setQueue(conf.getQueue());
|
||||
policy.setType(conf.getType());
|
||||
policy.setParams(conf.getParams());
|
||||
return policy;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the name of the queue for which we are configuring a policy.
|
||||
*
|
||||
|
|
|
@ -0,0 +1,108 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.federation.policies;
|
||||
|
||||
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
|
||||
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* This class provides common test methods for testing {@code
|
||||
* FederationPolicyManager}s.
|
||||
*/
|
||||
public abstract class BasePolicyManagerTest {
|
||||
|
||||
|
||||
@SuppressWarnings("checkstyle:visibilitymodifier")
|
||||
protected FederationPolicyManager wfp = null;
|
||||
@SuppressWarnings("checkstyle:visibilitymodifier")
|
||||
protected Class expectedPolicyManager;
|
||||
@SuppressWarnings("checkstyle:visibilitymodifier")
|
||||
protected Class expectedAMRMProxyPolicy;
|
||||
@SuppressWarnings("checkstyle:visibilitymodifier")
|
||||
protected Class expectedRouterPolicy;
|
||||
|
||||
|
||||
@Test
|
||||
public void testSerializeAndInstantiate() throws Exception {
|
||||
serializeAndDeserializePolicyManager(wfp, expectedPolicyManager,
|
||||
expectedAMRMProxyPolicy,
|
||||
expectedRouterPolicy);
|
||||
}
|
||||
|
||||
@Test(expected = FederationPolicyInitializationException.class)
|
||||
public void testSerializeAndInstantiateBad1() throws Exception {
|
||||
serializeAndDeserializePolicyManager(wfp, String.class,
|
||||
expectedAMRMProxyPolicy, expectedRouterPolicy);
|
||||
}
|
||||
|
||||
@Test(expected = AssertionError.class)
|
||||
public void testSerializeAndInstantiateBad2() throws Exception {
|
||||
serializeAndDeserializePolicyManager(wfp, expectedPolicyManager,
|
||||
String.class, expectedRouterPolicy);
|
||||
}
|
||||
|
||||
@Test(expected = AssertionError.class)
|
||||
public void testSerializeAndInstantiateBad3() throws Exception {
|
||||
serializeAndDeserializePolicyManager(wfp, expectedPolicyManager,
|
||||
expectedAMRMProxyPolicy, String.class);
|
||||
}
|
||||
|
||||
protected static void serializeAndDeserializePolicyManager(
|
||||
FederationPolicyManager wfp, Class policyManagerType,
|
||||
Class expAMRMProxyPolicy, Class expRouterPolicy) throws Exception {
|
||||
|
||||
// serializeConf it in a context
|
||||
SubClusterPolicyConfiguration fpc =
|
||||
wfp.serializeConf();
|
||||
fpc.setType(policyManagerType.getCanonicalName());
|
||||
FederationPolicyInitializationContext context = new
|
||||
FederationPolicyInitializationContext();
|
||||
context.setSubClusterPolicyConfiguration(fpc);
|
||||
context
|
||||
.setFederationStateStoreFacade(FederationPoliciesTestUtil.initFacade());
|
||||
context.setFederationSubclusterResolver(
|
||||
FederationPoliciesTestUtil.initResolver());
|
||||
context.setHomeSubcluster(SubClusterId.newInstance("homesubcluster"));
|
||||
|
||||
// based on the "context" created instantiate new class and use it
|
||||
Class c = Class.forName(wfp.getClass().getCanonicalName());
|
||||
FederationPolicyManager wfp2 = (FederationPolicyManager) c.newInstance();
|
||||
|
||||
FederationAMRMProxyPolicy federationAMRMProxyPolicy =
|
||||
wfp2.getAMRMPolicy(context, null);
|
||||
|
||||
//needed only for tests (getARMRMPolicy change the "type" in conf)
|
||||
fpc.setType(wfp.getClass().getCanonicalName());
|
||||
|
||||
FederationRouterPolicy federationRouterPolicy =
|
||||
wfp2.getRouterPolicy(context, null);
|
||||
|
||||
Assert.assertEquals(federationAMRMProxyPolicy.getClass(),
|
||||
expAMRMProxyPolicy);
|
||||
|
||||
Assert.assertEquals(federationRouterPolicy.getClass(),
|
||||
expRouterPolicy);
|
||||
}
|
||||
|
||||
}
|
|
@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMR
|
|||
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
|
||||
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
|
||||
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
|
||||
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
||||
|
@ -38,6 +39,7 @@ public class TestFederationPolicyInitializationContextValidator {
|
|||
private SubClusterPolicyConfiguration goodConfig;
|
||||
private SubClusterResolver goodSR;
|
||||
private FederationStateStoreFacade goodFacade;
|
||||
private SubClusterId goodHome;
|
||||
private FederationPolicyInitializationContext context;
|
||||
|
||||
@Before
|
||||
|
@ -45,8 +47,9 @@ public class TestFederationPolicyInitializationContextValidator {
|
|||
goodFacade = FederationPoliciesTestUtil.initFacade();
|
||||
goodConfig = new MockPolicyManager().serializeConf();
|
||||
goodSR = FederationPoliciesTestUtil.initResolver();
|
||||
goodHome = SubClusterId.newInstance("homesubcluster");
|
||||
context = new FederationPolicyInitializationContext(goodConfig, goodSR,
|
||||
goodFacade);
|
||||
goodFacade, goodHome);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.federation.policies;
|
||||
|
||||
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy;
|
||||
import org.junit.Before;
|
||||
|
||||
/**
|
||||
* Simple test of {@link UniformBroadcastPolicyManager}.
|
||||
*/
|
||||
public class TestUniformBroadcastPolicyManager extends BasePolicyManagerTest {
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
//config policy
|
||||
wfp = new UniformBroadcastPolicyManager();
|
||||
wfp.setQueue("queue1");
|
||||
|
||||
//set expected params that the base test class will use for tests
|
||||
expectedPolicyManager = UniformBroadcastPolicyManager.class;
|
||||
expectedAMRMProxyPolicy = BroadcastAMRMProxyPolicy.class;
|
||||
expectedRouterPolicy = UniformRandomRouterPolicy.class;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,79 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.federation.policies;
|
||||
|
||||
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.router.WeightedRandomRouterPolicy;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Simple test of {@link WeightedLocalityPolicyManager}.
|
||||
*/
|
||||
public class TestWeightedLocalityPolicyManager extends
|
||||
BasePolicyManagerTest {
|
||||
|
||||
private WeightedPolicyInfo policyInfo;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
// configure a policy
|
||||
|
||||
wfp = new WeightedLocalityPolicyManager();
|
||||
wfp.setQueue("queue1");
|
||||
SubClusterId sc1 = SubClusterId.newInstance("sc1");
|
||||
SubClusterId sc2 = SubClusterId.newInstance("sc2");
|
||||
policyInfo = new WeightedPolicyInfo();
|
||||
|
||||
Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
|
||||
routerWeights.put(new SubClusterIdInfo(sc1), 0.2f);
|
||||
routerWeights.put(new SubClusterIdInfo(sc2), 0.8f);
|
||||
policyInfo.setRouterPolicyWeights(routerWeights);
|
||||
|
||||
Map<SubClusterIdInfo, Float> amrmWeights = new HashMap<>();
|
||||
amrmWeights.put(new SubClusterIdInfo(sc1), 0.2f);
|
||||
amrmWeights.put(new SubClusterIdInfo(sc2), 0.8f);
|
||||
policyInfo.setAMRMPolicyWeights(amrmWeights);
|
||||
|
||||
((WeightedLocalityPolicyManager) wfp).setWeightedPolicyInfo(
|
||||
policyInfo);
|
||||
|
||||
//set expected params that the base test class will use for tests
|
||||
expectedPolicyManager = WeightedLocalityPolicyManager.class;
|
||||
expectedAMRMProxyPolicy = LocalityMulticastAMRMProxyPolicy.class;
|
||||
expectedRouterPolicy = WeightedRandomRouterPolicy.class;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPolicyInfoSetCorrectly() throws Exception {
|
||||
serializeAndDeserializePolicyManager(wfp, expectedPolicyManager,
|
||||
expectedAMRMProxyPolicy,
|
||||
expectedRouterPolicy);
|
||||
|
||||
//check the policyInfo propagates through ser/der correctly
|
||||
Assert.assertEquals(((WeightedLocalityPolicyManager) wfp)
|
||||
.getWeightedPolicyInfo(), policyInfo);
|
||||
}
|
||||
}
|
|
@ -143,7 +143,7 @@ public final class FederationPoliciesTestUtil {
|
|||
SubClusterInfo> activeSubclusters) throws YarnException {
|
||||
FederationPolicyInitializationContext context =
|
||||
new FederationPolicyInitializationContext(null, initResolver(),
|
||||
initFacade());
|
||||
initFacade(), SubClusterId.newInstance("homesubcluster"));
|
||||
initializePolicyContext(context, policy, policyInfo, activeSubclusters);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue