YARN-8982. [Router] Add locality policy. Contributed by Young Chen.
This commit is contained in:
parent
a7d72c523a
commit
bf8686f43f
|
@ -37,6 +37,8 @@ import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* Utility class for Federation policy.
|
||||
*/
|
||||
|
@ -48,7 +50,7 @@ public final class FederationPolicyUtils {
|
|||
public static final String NO_ACTIVE_SUBCLUSTER_AVAILABLE =
|
||||
"No active SubCluster available to submit the request.";
|
||||
|
||||
private static final Random RAND = new Random(System.currentTimeMillis());
|
||||
private static Random rand = new Random(System.currentTimeMillis());
|
||||
|
||||
/** Disable constructor. */
|
||||
private FederationPolicyUtils() {
|
||||
|
@ -223,7 +225,7 @@ public final class FederationPolicyUtils {
|
|||
if (totalWeight == 0) {
|
||||
return -1;
|
||||
}
|
||||
float samplePoint = RAND.nextFloat() * totalWeight;
|
||||
float samplePoint = rand.nextFloat() * totalWeight;
|
||||
int lastIndex = 0;
|
||||
for (i = 0; i < weights.size(); i++) {
|
||||
if (weights.get(i) > 0) {
|
||||
|
@ -239,4 +241,9 @@ public final class FederationPolicyUtils {
|
|||
// float rounding kicks in during subtractions
|
||||
return lastIndex;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public static void setRand(long seed){
|
||||
rand.setSeed(seed);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,18 +17,19 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.federation.policies.manager;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.router.WeightedRandomRouterPolicy;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.router.LocalityRouterPolicy;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* Policy that allows operator to configure "weights" for routing. This picks a
|
||||
* {@link WeightedRandomRouterPolicy} for the router and a {@link
|
||||
* {@link LocalityRouterPolicy} for the router and a {@link
|
||||
* LocalityMulticastAMRMProxyPolicy} for the amrmproxy as they are designed to
|
||||
* work together.
|
||||
*/
|
||||
|
@ -40,7 +41,7 @@ public class WeightedLocalityPolicyManager
|
|||
public WeightedLocalityPolicyManager() {
|
||||
//this structurally hard-codes two compatible policies for Router and
|
||||
// AMRMProxy.
|
||||
routerFederationPolicy = WeightedRandomRouterPolicy.class;
|
||||
routerFederationPolicy = LocalityRouterPolicy.class;
|
||||
amrmProxyFederationPolicy = LocalityMulticastAMRMProxyPolicy.class;
|
||||
weightedPolicyInfo = new WeightedPolicyInfo();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,196 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.federation.policies.router;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Collections;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
|
||||
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* This policy selects the subcluster depending on the node where the Client
|
||||
* wants to run its application.
|
||||
*
|
||||
* It succeeds if:
|
||||
*
|
||||
* - There are three AMContainerResourceRequests in the order
|
||||
* NODE, RACK, ANY
|
||||
*
|
||||
* It falls back to WeightedRandomRouterPolicy in case of:
|
||||
*
|
||||
* - Null or empty AMContainerResourceRequests;
|
||||
*
|
||||
* - One AMContainerResourceRequests and it has ANY as ResourceName;
|
||||
*
|
||||
* - The node is in blacklisted SubClusters.
|
||||
*
|
||||
* It fails if:
|
||||
*
|
||||
* - The node does not exist and RelaxLocality is False;
|
||||
*
|
||||
* - We have an invalid number (not 0, 1 or 3) resource requests
|
||||
*/
|
||||
public class LocalityRouterPolicy extends WeightedRandomRouterPolicy {
|
||||
|
||||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(LocalityRouterPolicy.class);
|
||||
|
||||
private SubClusterResolver resolver;
|
||||
private List<SubClusterId> enabledSCs;
|
||||
|
||||
@Override
|
||||
public void reinitialize(FederationPolicyInitializationContext policyContext)
|
||||
throws FederationPolicyInitializationException {
|
||||
super.reinitialize(policyContext);
|
||||
resolver = policyContext.getFederationSubclusterResolver();
|
||||
Map<SubClusterIdInfo, Float> weights =
|
||||
getPolicyInfo().getRouterPolicyWeights();
|
||||
enabledSCs = new ArrayList<SubClusterId>();
|
||||
for (Map.Entry<SubClusterIdInfo, Float> entry : weights.entrySet()) {
|
||||
if (entry != null && entry.getValue() > 0) {
|
||||
enabledSCs.add(entry.getKey().toId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public SubClusterId getHomeSubcluster(
|
||||
ApplicationSubmissionContext appSubmissionContext,
|
||||
List<SubClusterId> blackListSubClusters) throws YarnException {
|
||||
|
||||
// null checks and default-queue behavior
|
||||
validate(appSubmissionContext);
|
||||
|
||||
List<ResourceRequest> rrList =
|
||||
appSubmissionContext.getAMContainerResourceRequests();
|
||||
|
||||
// Fast path for FailForward to WeightedRandomRouterPolicy
|
||||
if (rrList == null || rrList.isEmpty() || (rrList.size() == 1
|
||||
&& ResourceRequest.isAnyLocation(rrList.get(0).getResourceName()))) {
|
||||
return super
|
||||
.getHomeSubcluster(appSubmissionContext, blackListSubClusters);
|
||||
}
|
||||
|
||||
if (rrList.size() != 3) {
|
||||
throw new FederationPolicyException(
|
||||
"Invalid number of resource requests: " + rrList.size());
|
||||
}
|
||||
|
||||
Map<SubClusterId, SubClusterInfo> activeSubClusters =
|
||||
getActiveSubclusters();
|
||||
List<SubClusterId> validSubClusters =
|
||||
new ArrayList<>(activeSubClusters.keySet());
|
||||
FederationPolicyUtils
|
||||
.validateSubClusterAvailability(validSubClusters, blackListSubClusters);
|
||||
if (blackListSubClusters != null) {
|
||||
// Remove from the active SubClusters from StateStore the blacklisted ones
|
||||
validSubClusters.removeAll(blackListSubClusters);
|
||||
}
|
||||
|
||||
try {
|
||||
// With three requests, this has been processed by the
|
||||
// ResourceRequestInterceptorREST, and should have
|
||||
// node, rack, and any
|
||||
SubClusterId targetId = null;
|
||||
ResourceRequest nodeRequest = null;
|
||||
ResourceRequest rackRequest = null;
|
||||
ResourceRequest anyRequest = null;
|
||||
for (ResourceRequest rr : rrList) {
|
||||
// Handle "node" requests
|
||||
try {
|
||||
targetId = resolver.getSubClusterForNode(rr.getResourceName());
|
||||
nodeRequest = rr;
|
||||
} catch (YarnException e) {
|
||||
LOG.error("Cannot resolve node : {}", e.getLocalizedMessage());
|
||||
}
|
||||
// Handle "rack" requests
|
||||
try {
|
||||
resolver.getSubClustersForRack(rr.getResourceName());
|
||||
rackRequest = rr;
|
||||
} catch (YarnException e) {
|
||||
LOG.error("Cannot resolve rack : {}", e.getLocalizedMessage());
|
||||
}
|
||||
// Handle "ANY" requests
|
||||
if (ResourceRequest.isAnyLocation(rr.getResourceName())) {
|
||||
anyRequest = rr;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if (nodeRequest == null) {
|
||||
throw new YarnException("Missing node request");
|
||||
}
|
||||
if (rackRequest == null) {
|
||||
throw new YarnException("Missing rack request");
|
||||
}
|
||||
if (anyRequest == null) {
|
||||
throw new YarnException("Missing any request");
|
||||
}
|
||||
LOG.info(
|
||||
"Node request: " + nodeRequest.getResourceName() + ", Rack request: "
|
||||
+ rackRequest.getResourceName() + ", Any request: " + anyRequest
|
||||
.getResourceName());
|
||||
// Handle "node" requests
|
||||
if (validSubClusters.contains(targetId) && enabledSCs
|
||||
.contains(targetId)) {
|
||||
LOG.info("Node {} is in SubCluster: {}", nodeRequest.getResourceName(),
|
||||
targetId);
|
||||
return targetId;
|
||||
} else {
|
||||
throw new YarnException("The node " + nodeRequest.getResourceName()
|
||||
+ " is in a blacklist SubCluster or not active. ");
|
||||
}
|
||||
} catch (YarnException e) {
|
||||
LOG.error("Validating resource requests failed, Falling back to "
|
||||
+ "WeightedRandomRouterPolicy placement: " + e.getMessage());
|
||||
// FailForward to WeightedRandomRouterPolicy
|
||||
// Overwrite request to use a default ANY
|
||||
ResourceRequest amReq = Records.newRecord(ResourceRequest.class);
|
||||
amReq.setPriority(appSubmissionContext.getPriority());
|
||||
amReq.setResourceName(ResourceRequest.ANY);
|
||||
amReq.setCapability(appSubmissionContext.getResource());
|
||||
amReq.setNumContainers(1);
|
||||
amReq.setRelaxLocality(true);
|
||||
amReq.setNodeLabelExpression(
|
||||
appSubmissionContext.getNodeLabelExpression());
|
||||
amReq.setExecutionTypeRequest(
|
||||
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED));
|
||||
appSubmissionContext
|
||||
.setAMContainerResourceRequests(Collections.singletonList(amReq));
|
||||
return super
|
||||
.getHomeSubcluster(appSubmissionContext, blackListSubClusters);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -162,8 +162,8 @@ public abstract class BaseFederationPoliciesTest {
|
|||
return rand;
|
||||
}
|
||||
|
||||
public void setRand(Random rand) {
|
||||
this.rand = rand;
|
||||
public void setRand(long seed) {
|
||||
this.rand.setSeed(seed);
|
||||
}
|
||||
|
||||
public SubClusterId getHomeSubCluster() {
|
||||
|
|
|
@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.federation.policies.manager;
|
|||
|
||||
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.router.WeightedRandomRouterPolicy;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.router.LocalityRouterPolicy;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
|
||||
import org.junit.Assert;
|
||||
|
@ -63,7 +63,7 @@ public class TestWeightedLocalityPolicyManager extends
|
|||
//set expected params that the base test class will use for tests
|
||||
expectedPolicyManager = WeightedLocalityPolicyManager.class;
|
||||
expectedAMRMProxyPolicy = LocalityMulticastAMRMProxyPolicy.class;
|
||||
expectedRouterPolicy = WeightedRandomRouterPolicy.class;
|
||||
expectedRouterPolicy = LocalityRouterPolicy.class;
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -0,0 +1,282 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.federation.policies.router;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
|
||||
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
|
||||
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Test class to validate the correctness of LocalityRouterPolicy.
|
||||
*/
|
||||
public class TestLocalityRouterPolicy extends TestWeightedRandomRouterPolicy {
|
||||
|
||||
/*
|
||||
* The MachineList for the default Resolver has the following nodes:
|
||||
*
|
||||
* node1<=>subcluster1
|
||||
*
|
||||
* node2<=>subcluster2
|
||||
*
|
||||
* noDE3<=>subcluster3
|
||||
*
|
||||
* node4<=>subcluster3
|
||||
*
|
||||
* subcluster0-rack0-host0<=>subcluster0
|
||||
*
|
||||
* Subcluster1-RACK1-HOST1<=>subcluster1
|
||||
*
|
||||
* SUBCLUSTER1-RACK1-HOST2<=>subcluster1
|
||||
*
|
||||
* SubCluster2-RACK3-HOST3<=>subcluster2
|
||||
*/
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
setPolicy(new LocalityRouterPolicy());
|
||||
setPolicyInfo(new WeightedPolicyInfo());
|
||||
|
||||
configureWeights(4);
|
||||
|
||||
initializePolicy(new YarnConfiguration());
|
||||
}
|
||||
|
||||
private void initializePolicy(Configuration conf) throws YarnException {
|
||||
setFederationPolicyContext(new FederationPolicyInitializationContext());
|
||||
SubClusterResolver resolver = FederationPoliciesTestUtil.initResolver();
|
||||
getFederationPolicyContext().setFederationSubclusterResolver(resolver);
|
||||
ByteBuffer buf = getPolicyInfo().toByteBuffer();
|
||||
getFederationPolicyContext().setSubClusterPolicyConfiguration(
|
||||
SubClusterPolicyConfiguration
|
||||
.newInstance("queue1", getPolicy().getClass().getCanonicalName(),
|
||||
buf));
|
||||
getFederationPolicyContext().setHomeSubcluster(getHomeSubCluster());
|
||||
FederationPoliciesTestUtil
|
||||
.initializePolicyContext(getFederationPolicyContext(), getPolicy(),
|
||||
getPolicyInfo(), getActiveSubclusters(), conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness in case of the request has 1 node and
|
||||
* the node belongs to an active subcluster.
|
||||
*/
|
||||
@Test
|
||||
public void testNodeInActiveSubCluster() throws YarnException {
|
||||
List<ResourceRequest> requests = new ArrayList<ResourceRequest>();
|
||||
requests.add(ResourceRequest
|
||||
.newInstance(Priority.UNDEFINED, "node1", Resource.newInstance(10, 1),
|
||||
1));
|
||||
requests.add(ResourceRequest
|
||||
.newInstance(Priority.UNDEFINED, "rack1", Resource.newInstance(10, 1),
|
||||
1));
|
||||
requests.add(ResourceRequest
|
||||
.newInstance(Priority.UNDEFINED, ResourceRequest.ANY,
|
||||
Resource.newInstance(10, 1), 1));
|
||||
ApplicationSubmissionContext asc = ApplicationSubmissionContext
|
||||
.newInstance(null, null, null, null, null, false, false, 0,
|
||||
Resources.none(), null, false, null, null);
|
||||
asc.setAMContainerResourceRequests(requests);
|
||||
|
||||
SubClusterId chosen =
|
||||
((FederationRouterPolicy) getPolicy()).getHomeSubcluster(asc, null);
|
||||
// If node1 is active, we should choose the sub cluster with node1
|
||||
if (getActiveSubclusters().containsKey(
|
||||
getFederationPolicyContext().getFederationSubclusterResolver()
|
||||
.getSubClusterForNode("node1").getId())) {
|
||||
Assert.assertEquals(
|
||||
getFederationPolicyContext().getFederationSubclusterResolver()
|
||||
.getSubClusterForNode("node1"), chosen);
|
||||
}
|
||||
// Regardless, we should choose an active SubCluster
|
||||
Assert.assertTrue(getActiveSubclusters().containsKey(chosen));
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness in case of the request has multiple
|
||||
* ResourceRequests. The tests without ResourceRequests are done in
|
||||
* TestWeightedRandomRouterPolicy.
|
||||
*/
|
||||
@Test
|
||||
public void testMultipleResourceRequests() throws YarnException {
|
||||
List<ResourceRequest> requests = new ArrayList<ResourceRequest>();
|
||||
requests.add(ResourceRequest
|
||||
.newInstance(Priority.UNDEFINED, "node1", Resource.newInstance(10, 1),
|
||||
1));
|
||||
requests.add(ResourceRequest
|
||||
.newInstance(Priority.UNDEFINED, "node2", Resource.newInstance(10, 1),
|
||||
1));
|
||||
ApplicationSubmissionContext asc = ApplicationSubmissionContext
|
||||
.newInstance(null, null, null, null, null, false, false, 0,
|
||||
Resources.none(), null, false, null, null);
|
||||
asc.setAMContainerResourceRequests(requests);
|
||||
try {
|
||||
((FederationRouterPolicy) getPolicy()).getHomeSubcluster(asc, null);
|
||||
Assert.fail();
|
||||
} catch (FederationPolicyException e) {
|
||||
Assert.assertTrue(
|
||||
e.getMessage().startsWith("Invalid number of resource requests: "));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness in case of the request has 1 node and
|
||||
* the node does not exist in the Resolver MachineList file.
|
||||
*/
|
||||
@Test
|
||||
public void testNodeNotExists() throws YarnException {
|
||||
List<ResourceRequest> requests = new ArrayList<ResourceRequest>();
|
||||
boolean relaxLocality = true;
|
||||
requests.add(ResourceRequest
|
||||
.newInstance(Priority.UNDEFINED, "node5", Resource.newInstance(10, 1),
|
||||
1, relaxLocality));
|
||||
requests.add(ResourceRequest
|
||||
.newInstance(Priority.UNDEFINED, "rack1", Resource.newInstance(10, 1),
|
||||
1));
|
||||
requests.add(ResourceRequest
|
||||
.newInstance(Priority.UNDEFINED, ResourceRequest.ANY,
|
||||
Resource.newInstance(10, 1), 1));
|
||||
ApplicationSubmissionContext asc = ApplicationSubmissionContext
|
||||
.newInstance(null, null, null, null, null, false, false, 0,
|
||||
Resources.none(), null, false, null, null);
|
||||
asc.setAMContainerResourceRequests(requests);
|
||||
|
||||
try {
|
||||
((FederationRouterPolicy) getPolicy()).getHomeSubcluster(asc, null);
|
||||
} catch (FederationPolicyException e) {
|
||||
Assert.fail();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness in case of the request has 1 node and
|
||||
* the node is in a blacklist subclusters.
|
||||
*/
|
||||
@Test
|
||||
public void testNodeInABlacklistSubCluster() throws YarnException {
|
||||
// Blacklist SubCluster3
|
||||
String subClusterToBlacklist = "subcluster3";
|
||||
// Remember the current value of subcluster3
|
||||
Float value =
|
||||
getPolicyInfo().getRouterPolicyWeights().get(subClusterToBlacklist);
|
||||
getPolicyInfo().getRouterPolicyWeights()
|
||||
.put(new SubClusterIdInfo(subClusterToBlacklist), 0.0f);
|
||||
initializePolicy(new YarnConfiguration());
|
||||
|
||||
FederationPoliciesTestUtil
|
||||
.initializePolicyContext(getFederationPolicyContext(), getPolicy(),
|
||||
getPolicyInfo(), getActiveSubclusters(), new Configuration());
|
||||
|
||||
List<ResourceRequest> requests = new ArrayList<ResourceRequest>();
|
||||
boolean relaxLocality = true;
|
||||
requests.add(ResourceRequest
|
||||
.newInstance(Priority.UNDEFINED, "node4", Resource.newInstance(10, 1),
|
||||
1, relaxLocality));
|
||||
requests.add(ResourceRequest
|
||||
.newInstance(Priority.UNDEFINED, "rack1", Resource.newInstance(10, 1),
|
||||
1));
|
||||
requests.add(ResourceRequest
|
||||
.newInstance(Priority.UNDEFINED, ResourceRequest.ANY,
|
||||
Resource.newInstance(10, 1), 1));
|
||||
ApplicationSubmissionContext asc = ApplicationSubmissionContext
|
||||
.newInstance(null, null, null, null, null, false, false, 0,
|
||||
Resources.none(), null, false, null, null);
|
||||
asc.setAMContainerResourceRequests(requests);
|
||||
|
||||
try {
|
||||
SubClusterId targetId =
|
||||
((FederationRouterPolicy) getPolicy()).getHomeSubcluster(asc, null);
|
||||
// The selected subcluster HAS no to be the same as the one blacklisted.
|
||||
Assert.assertNotEquals(targetId.getId(), subClusterToBlacklist);
|
||||
} catch (FederationPolicyException e) {
|
||||
Assert.fail();
|
||||
}
|
||||
|
||||
// Set again the previous value for the other tests
|
||||
getPolicyInfo().getRouterPolicyWeights()
|
||||
.put(new SubClusterIdInfo(subClusterToBlacklist), value);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness in case of the request has 1 node and
|
||||
* the node is not in the policy weights.
|
||||
*/
|
||||
@Test
|
||||
public void testNodeNotInPolicy() throws YarnException {
|
||||
// Blacklist SubCluster3
|
||||
String subClusterToBlacklist = "subcluster3";
|
||||
// Remember the current value of subcluster3
|
||||
Float value =
|
||||
getPolicyInfo().getRouterPolicyWeights().get(subClusterToBlacklist);
|
||||
getPolicyInfo().getRouterPolicyWeights().remove(subClusterToBlacklist);
|
||||
initializePolicy(new YarnConfiguration());
|
||||
|
||||
FederationPoliciesTestUtil
|
||||
.initializePolicyContext(getFederationPolicyContext(), getPolicy(),
|
||||
getPolicyInfo(), getActiveSubclusters(), new Configuration());
|
||||
|
||||
List<ResourceRequest> requests = new ArrayList<ResourceRequest>();
|
||||
boolean relaxLocality = true;
|
||||
requests.add(ResourceRequest
|
||||
.newInstance(Priority.UNDEFINED, "node4", Resource.newInstance(10, 1),
|
||||
1, relaxLocality));
|
||||
requests.add(ResourceRequest
|
||||
.newInstance(Priority.UNDEFINED, "rack1", Resource.newInstance(10, 1),
|
||||
1));
|
||||
requests.add(ResourceRequest
|
||||
.newInstance(Priority.UNDEFINED, ResourceRequest.ANY,
|
||||
Resource.newInstance(10, 1), 1));
|
||||
ApplicationSubmissionContext asc = ApplicationSubmissionContext
|
||||
.newInstance(null, null, null, null, null, false, false, 0,
|
||||
Resources.none(), null, false, null, null);
|
||||
asc.setAMContainerResourceRequests(requests);
|
||||
|
||||
try {
|
||||
SubClusterId targetId =
|
||||
((FederationRouterPolicy) getPolicy()).getHomeSubcluster(asc, null);
|
||||
// The selected subcluster HAS no to be the same as the one blacklisted.
|
||||
Assert.assertNotEquals(targetId.getId(), subClusterToBlacklist);
|
||||
} catch (FederationPolicyException e) {
|
||||
Assert.fail();
|
||||
}
|
||||
|
||||
// Set again the previous value for the other tests
|
||||
getPolicyInfo().getRouterPolicyWeights()
|
||||
.put(new SubClusterIdInfo(subClusterToBlacklist), value);
|
||||
}
|
||||
}
|
||||
|
|
@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
|
||||
|
@ -47,10 +48,21 @@ public class TestWeightedRandomRouterPolicy extends BaseRouterPoliciesTest {
|
|||
public void setUp() throws Exception {
|
||||
setPolicy(new WeightedRandomRouterPolicy());
|
||||
setPolicyInfo(new WeightedPolicyInfo());
|
||||
|
||||
configureWeights(20);
|
||||
|
||||
FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
|
||||
getPolicyInfo(), getActiveSubclusters());
|
||||
}
|
||||
|
||||
public void configureWeights(float numSubClusters) {
|
||||
// Set random seed to remove random failures
|
||||
FederationPolicyUtils.setRand(5);
|
||||
setRand(5);
|
||||
|
||||
Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
|
||||
Map<SubClusterIdInfo, Float> amrmWeights = new HashMap<>();
|
||||
|
||||
float numSubClusters = 20;
|
||||
// simulate N subclusters each with a 5% chance of being inactive
|
||||
for (int i = 0; i < numSubClusters; i++) {
|
||||
SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i);
|
||||
|
@ -74,10 +86,6 @@ public class TestWeightedRandomRouterPolicy extends BaseRouterPoliciesTest {
|
|||
}
|
||||
getPolicyInfo().setRouterPolicyWeights(routerWeights);
|
||||
getPolicyInfo().setAMRMPolicyWeights(amrmWeights);
|
||||
|
||||
FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
|
||||
getPolicyInfo(), getActiveSubclusters());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue