YARN-5676. Add a HashBasedRouterPolicy, and small policies and test refactoring. (Carlo Curino via Subru).

(cherry picked from commit 575137f41c)
This commit is contained in:
Subru Krishnan 2016-11-22 15:02:22 -08:00 committed by Carlo Curino
parent 083a05bc14
commit 4128c9522d
29 changed files with 414 additions and 52 deletions

View File

@ -2600,7 +2600,8 @@ public class YarnConfiguration extends Configuration {
+ "policy-manager"; + "policy-manager";
public static final String DEFAULT_FEDERATION_POLICY_MANAGER = "org.apache" public static final String DEFAULT_FEDERATION_POLICY_MANAGER = "org.apache"
+ ".hadoop.yarn.server.federation.policies.UniformBroadcastPolicyManager"; + ".hadoop.yarn.server.federation.policies"
+ ".manager.UniformBroadcastPolicyManager";
public static final String FEDERATION_POLICY_MANAGER_PARAMS = public static final String FEDERATION_POLICY_MANAGER_PARAMS =
FEDERATION_PREFIX + "policy-manager-params"; FEDERATION_PREFIX + "policy-manager-params";

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;

View File

@ -15,8 +15,11 @@
* the License. * the License.
*/ */
package org.apache.hadoop.yarn.server.federation.policies; package org.apache.hadoop.yarn.server.federation.policies.manager;
import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
@ -24,6 +27,8 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyCo
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
/** /**
* This class provides basic implementation for common methods that multiple * This class provides basic implementation for common methods that multiple
* policies will need to implement. * policies will need to implement.
@ -112,6 +117,16 @@ public abstract class AbstractPolicyManager implements
} }
} }
@Override
public SubClusterPolicyConfiguration serializeConf()
throws FederationPolicyInitializationException {
// default implementation works only for sub-classes which do not require
// any parameters
ByteBuffer buf = ByteBuffer.allocate(0);
return SubClusterPolicyConfiguration
.newInstance(getQueue(), this.getClass().getCanonicalName(), buf);
}
@Override @Override
public String getQueue() { public String getQueue() {
return queue; return queue;

View File

@ -15,8 +15,9 @@
* the License. * the License.
*/ */
package org.apache.hadoop.yarn.server.federation.policies; package org.apache.hadoop.yarn.server.federation.policies.manager;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.manager;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.router.HashBasedRouterPolicy;
/**
* Policy that routes applications via hashing of their queuename, and broadcast
* resource requests. This picks a {@link HashBasedRouterPolicy} for the router
* and a {@link BroadcastAMRMProxyPolicy} for the amrmproxy as they are designed
* to work together.
*/
public class HashBroadcastPolicyManager extends AbstractPolicyManager {
public HashBroadcastPolicyManager() {
// this structurally hard-codes two compatible policies for Router and
// AMRMProxy.
routerFederationPolicy = HashBasedRouterPolicy.class;
amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class;
}
}

View File

@ -15,7 +15,7 @@
* the License. * the License.
*/ */
package org.apache.hadoop.yarn.server.federation.policies; package org.apache.hadoop.yarn.server.federation.policies.manager;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;

View File

@ -15,14 +15,10 @@
* the License. * the License.
*/ */
package org.apache.hadoop.yarn.server.federation.policies; package org.apache.hadoop.yarn.server.federation.policies.manager;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy; import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import java.nio.ByteBuffer;
/** /**
* This class represents a simple implementation of a {@code * This class represents a simple implementation of a {@code
@ -36,21 +32,13 @@ import java.nio.ByteBuffer;
* containers than a job requested as all requests are (replicated and) * containers than a job requested as all requests are (replicated and)
* broadcasted. * broadcasted.
*/ */
public class UniformBroadcastPolicyManager public class UniformBroadcastPolicyManager extends AbstractPolicyManager {
extends AbstractPolicyManager {
public UniformBroadcastPolicyManager() { public UniformBroadcastPolicyManager() {
//this structurally hard-codes two compatible policies for Router and // this structurally hard-codes two compatible policies for Router and
// AMRMProxy. // AMRMProxy.
routerFederationPolicy = UniformRandomRouterPolicy.class; routerFederationPolicy = UniformRandomRouterPolicy.class;
amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class; amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class;
} }
@Override
public SubClusterPolicyConfiguration serializeConf()
throws FederationPolicyInitializationException {
ByteBuffer buf = ByteBuffer.allocate(0);
return SubClusterPolicyConfiguration
.newInstance(getQueue(), this.getClass().getCanonicalName(), buf);
}
} }

View File

@ -15,7 +15,7 @@
* the License. * the License.
*/ */
package org.apache.hadoop.yarn.server.federation.policies; package org.apache.hadoop.yarn.server.federation.policies.manager;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy;

View File

@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/** Various implementation of FederationPolicyManager. **/
package org.apache.hadoop.yarn.server.federation.policies.manager;

View File

@ -20,8 +20,11 @@ package org.apache.hadoop.yarn.server.federation.policies.router;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.federation.policies.AbstractConfigurableFederationPolicy; import org.apache.hadoop.yarn.server.federation.policies.AbstractConfigurableFederationPolicy;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
@ -44,4 +47,20 @@ public abstract class AbstractRouterPolicy extends
} }
} }
public void validate(ApplicationSubmissionContext appSubmissionContext)
throws FederationPolicyException {
if (appSubmissionContext == null) {
throw new FederationPolicyException(
"Cannot route an application with null context.");
}
// if the queue is not specified we set it to default value, to be
// compatible with YARN behavior.
String queue = appSubmissionContext.getQueue();
if (queue == null) {
appSubmissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
}
}
} }

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.router;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
/**
* This {@link FederationRouterPolicy} pick a subcluster based on the hash of
* the job's queue name. Useful to provide a default behavior when too many
* queues exist in a system. This also ensures that all jobs belonging to a
* queue are mapped to the same sub-cluster (likely help with locality).
*/
public class HashBasedRouterPolicy extends AbstractRouterPolicy {
@Override
public void reinitialize(
FederationPolicyInitializationContext federationPolicyContext)
throws FederationPolicyInitializationException {
FederationPolicyInitializationContextValidator
.validate(federationPolicyContext, this.getClass().getCanonicalName());
// note: this overrides BaseRouterPolicy and ignores the weights
setPolicyContext(federationPolicyContext);
}
/**
* Simply picks from alphabetically-sorted active subclusters based on the
* hash of quey name. Jobs of the same queue will all be routed to the same
* sub-cluster, as far as the number of active sub-cluster and their names
* remain the same.
*
* @param appSubmissionContext the context for the app being submitted.
*
* @return a hash-based chosen subcluster.
*
* @throws YarnException if there are no active subclusters.
*/
public SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext) throws YarnException {
// throws if no active subclusters available
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();
validate(appSubmissionContext);
int chosenPosition = Math.abs(
appSubmissionContext.getQueue().hashCode() % activeSubclusters.size());
List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
Collections.sort(list);
return list.get(chosenPosition);
}
}

View File

@ -64,6 +64,9 @@ public class LoadBasedRouterPolicy extends AbstractRouterPolicy {
public SubClusterId getHomeSubcluster( public SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext) throws YarnException { ApplicationSubmissionContext appSubmissionContext) throws YarnException {
// null checks and default-queue behavior
validate(appSubmissionContext);
Map<SubClusterId, SubClusterInfo> activeSubclusters = Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters(); getActiveSubclusters();

View File

@ -36,6 +36,9 @@ public class PriorityRouterPolicy extends AbstractRouterPolicy {
public SubClusterId getHomeSubcluster( public SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext) throws YarnException { ApplicationSubmissionContext appSubmissionContext) throws YarnException {
// null checks and default-queue behavior
validate(appSubmissionContext);
Map<SubClusterId, SubClusterInfo> activeSubclusters = Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters(); getActiveSubclusters();

View File

@ -48,11 +48,10 @@ public class UniformRandomRouterPolicy extends AbstractRouterPolicy {
} }
@Override @Override
public void reinitialize( public void reinitialize(FederationPolicyInitializationContext policyContext)
FederationPolicyInitializationContext policyContext)
throws FederationPolicyInitializationException { throws FederationPolicyInitializationException {
FederationPolicyInitializationContextValidator FederationPolicyInitializationContextValidator.validate(policyContext,
.validate(policyContext, this.getClass().getCanonicalName()); this.getClass().getCanonicalName());
// note: this overrides AbstractRouterPolicy and ignores the weights // note: this overrides AbstractRouterPolicy and ignores the weights
@ -73,6 +72,9 @@ public class UniformRandomRouterPolicy extends AbstractRouterPolicy {
public SubClusterId getHomeSubcluster( public SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext) throws YarnException { ApplicationSubmissionContext appSubmissionContext) throws YarnException {
// null checks and default-queue behavior
validate(appSubmissionContext);
Map<SubClusterId, SubClusterInfo> activeSubclusters = Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters(); getActiveSubclusters();

View File

@ -43,6 +43,9 @@ public class WeightedRandomRouterPolicy extends AbstractRouterPolicy {
public SubClusterId getHomeSubcluster( public SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext) throws YarnException { ApplicationSubmissionContext appSubmissionContext) throws YarnException {
// null checks and default-queue behavior
validate(appSubmissionContext);
Map<SubClusterId, SubClusterInfo> activeSubclusters = Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters(); getActiveSubclusters();

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.federation.policies; package org.apache.hadoop.yarn.server.federation.policies;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.HashMap; import java.util.HashMap;
@ -35,8 +36,10 @@ import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPo
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil; import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
import org.junit.Test; import org.junit.Test;
@ -46,7 +49,7 @@ import org.junit.Test;
public abstract class BaseFederationPoliciesTest { public abstract class BaseFederationPoliciesTest {
private ConfigurableFederationPolicy policy; private ConfigurableFederationPolicy policy;
private WeightedPolicyInfo policyInfo; private WeightedPolicyInfo policyInfo = mock(WeightedPolicyInfo.class);
private Map<SubClusterId, SubClusterInfo> activeSubclusters = new HashMap<>(); private Map<SubClusterId, SubClusterInfo> activeSubclusters = new HashMap<>();
private FederationPolicyInitializationContext federationPolicyContext; private FederationPolicyInitializationContext federationPolicyContext;
private ApplicationSubmissionContext applicationSubmissionContext = private ApplicationSubmissionContext applicationSubmissionContext =
@ -103,7 +106,7 @@ public abstract class BaseFederationPoliciesTest {
((FederationRouterPolicy) localPolicy) ((FederationRouterPolicy) localPolicy)
.getHomeSubcluster(getApplicationSubmissionContext()); .getHomeSubcluster(getApplicationSubmissionContext());
} else { } else {
String[] hosts = new String[] {"host1", "host2" }; String[] hosts = new String[] {"host1", "host2"};
List<ResourceRequest> resourceRequests = FederationPoliciesTestUtil List<ResourceRequest> resourceRequests = FederationPoliciesTestUtil
.createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false); .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false);
((FederationAMRMProxyPolicy) localPolicy) ((FederationAMRMProxyPolicy) localPolicy)
@ -170,4 +173,14 @@ public abstract class BaseFederationPoliciesTest {
this.homeSubCluster = homeSubCluster; this.homeSubCluster = homeSubCluster;
} }
public void setMockActiveSubclusters(int numSubclusters) {
for (int i = 1; i <= numSubclusters; i++) {
SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i);
SubClusterInfo sci = mock(SubClusterInfo.class);
when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
when(sci.getSubClusterId()).thenReturn(sc.toId());
getActiveSubclusters().put(sc.toId(), sci);
}
}
} }

View File

@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;

View File

@ -29,6 +29,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.manager.PriorityBroadcastPolicyManager;
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
import org.apache.hadoop.yarn.server.federation.policies.router.PriorityRouterPolicy; import org.apache.hadoop.yarn.server.federation.policies.router.PriorityRouterPolicy;
import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy; import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;

View File

@ -15,8 +15,9 @@
* the License. * the License.
*/ */
package org.apache.hadoop.yarn.server.federation.policies; package org.apache.hadoop.yarn.server.federation.policies.manager;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
@ -32,7 +33,6 @@ import org.junit.Test;
*/ */
public abstract class BasePolicyManagerTest { public abstract class BasePolicyManagerTest {
@SuppressWarnings("checkstyle:visibilitymodifier") @SuppressWarnings("checkstyle:visibilitymodifier")
protected FederationPolicyManager wfp = null; protected FederationPolicyManager wfp = null;
@SuppressWarnings("checkstyle:visibilitymodifier") @SuppressWarnings("checkstyle:visibilitymodifier")
@ -42,12 +42,10 @@ public abstract class BasePolicyManagerTest {
@SuppressWarnings("checkstyle:visibilitymodifier") @SuppressWarnings("checkstyle:visibilitymodifier")
protected Class expectedRouterPolicy; protected Class expectedRouterPolicy;
@Test @Test
public void testSerializeAndInstantiate() throws Exception { public void testSerializeAndInstantiate() throws Exception {
serializeAndDeserializePolicyManager(wfp, expectedPolicyManager, serializeAndDeserializePolicyManager(wfp, expectedPolicyManager,
expectedAMRMProxyPolicy, expectedAMRMProxyPolicy, expectedRouterPolicy);
expectedRouterPolicy);
} }
@Test(expected = FederationPolicyInitializationException.class) @Test(expected = FederationPolicyInitializationException.class)
@ -73,11 +71,10 @@ public abstract class BasePolicyManagerTest {
Class expAMRMProxyPolicy, Class expRouterPolicy) throws Exception { Class expAMRMProxyPolicy, Class expRouterPolicy) throws Exception {
// serializeConf it in a context // serializeConf it in a context
SubClusterPolicyConfiguration fpc = SubClusterPolicyConfiguration fpc = wfp.serializeConf();
wfp.serializeConf();
fpc.setType(policyManagerType.getCanonicalName()); fpc.setType(policyManagerType.getCanonicalName());
FederationPolicyInitializationContext context = new FederationPolicyInitializationContext context =
FederationPolicyInitializationContext(); new FederationPolicyInitializationContext();
context.setSubClusterPolicyConfiguration(fpc); context.setSubClusterPolicyConfiguration(fpc);
context context
.setFederationStateStoreFacade(FederationPoliciesTestUtil.initFacade()); .setFederationStateStoreFacade(FederationPoliciesTestUtil.initFacade());
@ -92,7 +89,7 @@ public abstract class BasePolicyManagerTest {
FederationAMRMProxyPolicy federationAMRMProxyPolicy = FederationAMRMProxyPolicy federationAMRMProxyPolicy =
wfp2.getAMRMPolicy(context, null); wfp2.getAMRMPolicy(context, null);
//needed only for tests (getARMRMPolicy change the "type" in conf) // needed only for tests (getARMRMPolicy change the "type" in conf)
fpc.setType(wfp.getClass().getCanonicalName()); fpc.setType(wfp.getClass().getCanonicalName());
FederationRouterPolicy federationRouterPolicy = FederationRouterPolicy federationRouterPolicy =
@ -101,8 +98,7 @@ public abstract class BasePolicyManagerTest {
Assert.assertEquals(federationAMRMProxyPolicy.getClass(), Assert.assertEquals(federationAMRMProxyPolicy.getClass(),
expAMRMProxyPolicy); expAMRMProxyPolicy);
Assert.assertEquals(federationRouterPolicy.getClass(), Assert.assertEquals(federationRouterPolicy.getClass(), expRouterPolicy);
expRouterPolicy);
} }
} }

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.manager;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.router.HashBasedRouterPolicy;
import org.junit.Before;
/**
* Simple test of {@link HashBroadcastPolicyManager}.
*/
public class TestHashBasedBroadcastPolicyManager extends BasePolicyManagerTest {
@Before
public void setup() {
// config policy
wfp = new HashBroadcastPolicyManager();
wfp.setQueue("queue1");
// set expected params that the base test class will use for tests
expectedPolicyManager = HashBroadcastPolicyManager.class;
expectedAMRMProxyPolicy = BroadcastAMRMProxyPolicy.class;
expectedRouterPolicy = HashBasedRouterPolicy.class;
}
}

View File

@ -15,7 +15,7 @@
* the License. * the License.
*/ */
package org.apache.hadoop.yarn.server.federation.policies; package org.apache.hadoop.yarn.server.federation.policies.manager;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;

View File

@ -15,7 +15,7 @@
* the License. * the License.
*/ */
package org.apache.hadoop.yarn.server.federation.policies; package org.apache.hadoop.yarn.server.federation.policies.manager;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy; import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy;

View File

@ -15,7 +15,7 @@
* the License. * the License.
*/ */
package org.apache.hadoop.yarn.server.federation.policies; package org.apache.hadoop.yarn.server.federation.policies.manager;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.router;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Test;
/**
* Base class for router policies tests, tests for null input cases.
*/
public abstract class BaseRouterPoliciesTest
extends BaseFederationPoliciesTest {
@Test
public void testNullQueueRouting() throws YarnException {
FederationRouterPolicy localPolicy = (FederationRouterPolicy) getPolicy();
ApplicationSubmissionContext applicationSubmissionContext =
ApplicationSubmissionContext.newInstance(null, null, null, null, null,
false, false, 0, Resources.none(), null, false, null, null);
SubClusterId chosen =
localPolicy.getHomeSubcluster(applicationSubmissionContext);
Assert.assertNotNull(chosen);
}
@Test(expected = FederationPolicyException.class)
public void testNullAppContext() throws YarnException {
((FederationRouterPolicy) getPolicy()).getHomeSubcluster(null);
}
}

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.router;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Simple test class for the {@link HashBasedRouterPolicy}. Tests that one of
* the active sub-cluster is chosen.
*/
public class TestHashBasedRouterPolicy extends BaseRouterPoliciesTest {
private int numSubclusters = 10;
@Before
public void setUp() throws Exception {
// set policy in base class
setPolicy(new HashBasedRouterPolicy());
// setting up the active sub-clusters for this test
setMockActiveSubclusters(numSubclusters);
// initialize policy with context
FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
getPolicyInfo(), getActiveSubclusters());
}
@Test
public void testHashSpreadUniformlyAmongSubclusters() throws YarnException {
SubClusterId chosen;
Map<SubClusterId, AtomicLong> counter = new HashMap<>();
for (SubClusterId id : getActiveSubclusters().keySet()) {
counter.put(id, new AtomicLong(0));
}
long jobPerSub = 100;
ApplicationSubmissionContext applicationSubmissionContext =
mock(ApplicationSubmissionContext.class);
for (int i = 0; i < jobPerSub * numSubclusters; i++) {
when(applicationSubmissionContext.getQueue()).thenReturn("queue" + i);
chosen = ((FederationRouterPolicy) getPolicy())
.getHomeSubcluster(applicationSubmissionContext);
counter.get(chosen).addAndGet(1);
}
// hash spread the jobs equally among the subclusters
for (AtomicLong a : counter.values()) {
Assert.assertEquals(a.get(), jobPerSub);
}
}
}

View File

@ -21,7 +21,6 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
@ -36,7 +35,7 @@ import org.junit.Test;
* Simple test class for the {@link LoadBasedRouterPolicy}. Test that the load * Simple test class for the {@link LoadBasedRouterPolicy}. Test that the load
* is properly considered for allocation. * is properly considered for allocation.
*/ */
public class TestLoadBasedRouterPolicy extends BaseFederationPoliciesTest { public class TestLoadBasedRouterPolicy extends BaseRouterPoliciesTest {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {

View File

@ -23,7 +23,6 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
@ -38,7 +37,7 @@ import org.junit.Test;
* Simple test class for the {@link PriorityRouterPolicy}. Tests that the * Simple test class for the {@link PriorityRouterPolicy}. Tests that the
* weights are correctly used for ordering the choice of sub-clusters. * weights are correctly used for ordering the choice of sub-clusters.
*/ */
public class TestPriorityRouterPolicy extends BaseFederationPoliciesTest { public class TestPriorityRouterPolicy extends BaseRouterPoliciesTest {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {

View File

@ -21,7 +21,6 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
@ -36,7 +35,7 @@ import org.junit.Test;
* Simple test class for the {@link UniformRandomRouterPolicy}. Tests that one * Simple test class for the {@link UniformRandomRouterPolicy}. Tests that one
* of the active subcluster is chosen. * of the active subcluster is chosen.
*/ */
public class TestUniformRandomRouterPolicy extends BaseFederationPoliciesTest { public class TestUniformRandomRouterPolicy extends BaseRouterPoliciesTest {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {

View File

@ -24,8 +24,8 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
@ -41,7 +41,7 @@ import org.junit.Test;
* number of randomized tests to check we are weighiting correctly even if * number of randomized tests to check we are weighiting correctly even if
* clusters go inactive. * clusters go inactive.
*/ */
public class TestWeightedRandomRouterPolicy extends BaseFederationPoliciesTest { public class TestWeightedRandomRouterPolicy extends BaseRouterPoliciesTest {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
@ -78,13 +78,18 @@ public class TestWeightedRandomRouterPolicy extends BaseFederationPoliciesTest {
@Test @Test
public void testClusterChosenWithRightProbability() throws YarnException { public void testClusterChosenWithRightProbability() throws YarnException {
ApplicationSubmissionContext context =
mock(ApplicationSubmissionContext.class);
when(context.getQueue()).thenReturn("queue1");
setApplicationSubmissionContext(context);
Map<SubClusterId, AtomicLong> counter = new HashMap<>(); Map<SubClusterId, AtomicLong> counter = new HashMap<>();
for (SubClusterIdInfo id : getPolicyInfo().getRouterPolicyWeights() for (SubClusterIdInfo id : getPolicyInfo().getRouterPolicyWeights()
.keySet()) { .keySet()) {
counter.put(id.toId(), new AtomicLong(0)); counter.put(id.toId(), new AtomicLong(0));
} }
float numberOfDraws = 1000000; float numberOfDraws = 100000;
for (float i = 0; i < numberOfDraws; i++) { for (float i = 0; i < numberOfDraws; i++) {
SubClusterId chosenId = ((FederationRouterPolicy) getPolicy()) SubClusterId chosenId = ((FederationRouterPolicy) getPolicy())
@ -113,8 +118,8 @@ public class TestWeightedRandomRouterPolicy extends BaseFederationPoliciesTest {
Assert.assertTrue( Assert.assertTrue(
"Id " + counterEntry.getKey() + " Actual weight: " + actualWeight "Id " + counterEntry.getKey() + " Actual weight: " + actualWeight
+ " expected weight: " + expectedWeight, + " expected weight: " + expectedWeight,
expectedWeight == 0 || (actualWeight / expectedWeight) < 1.1 expectedWeight == 0 || (actualWeight / expectedWeight) < 1.2
&& (actualWeight / expectedWeight) > 0.9); && (actualWeight / expectedWeight) > 0.8);
} else { } else {
Assert Assert
.assertTrue( .assertTrue(