YARN-6724. Add ability to blacklist sub-clusters when invoking Routing policies. (Giovanni Matteo Fumarola via Subru).

(cherry picked from commit f8e5de5969)
This commit is contained in:
Subru Krishnan 2017-06-21 19:08:47 -07:00 committed by Carlo Curino
parent 433ee44b58
commit 4cfec943b1
17 changed files with 157 additions and 43 deletions

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.federation.policies;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -110,16 +111,22 @@ public class RouterPolicyFacade {
* This method provides a wrapper of all policy functionalities for routing .
* Internally it manages configuration changes, and policy init/reinit.
*
* @param appSubmissionContext the application to route.
* @param appSubmissionContext the {@link ApplicationSubmissionContext} that
* has to be routed to an appropriate subCluster for execution.
*
* @return the id of the subcluster that will be the "home" for this
* @param blackListSubClusters the list of subClusters as identified by
* {@link SubClusterId} to blackList from the selection of the home
* subCluster.
*
* @return the {@link SubClusterId} that will be the "home" for this
* application.
*
* @throws YarnException if there are issues initializing policies, or no
* valid sub-cluster id could be found for this app.
*/
public SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext) throws YarnException {
ApplicationSubmissionContext appSubmissionContext,
List<SubClusterId> blackListSubClusters) throws YarnException {
// the maps are concurrent, but we need to protect from reset()
// reinitialization mid-execution by creating a new reference local to this
@ -186,7 +193,7 @@ public class RouterPolicyFacade {
+ "and no default specified.");
}
return policy.getHomeSubcluster(appSubmissionContext);
return policy.getHomeSubcluster(appSubmissionContext, blackListSubClusters);
}
/**

View File

@ -17,6 +17,8 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy;
@ -29,16 +31,22 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
public interface FederationRouterPolicy extends ConfigurableFederationPolicy {
/**
* Determines the sub-cluster that the user application submision should be
* Determines the sub-cluster that the user application submission should be
* routed to.
*
* @param appSubmissionContext the context for the app being submitted.
* @param appSubmissionContext the {@link ApplicationSubmissionContext} that
* has to be routed to an appropriate subCluster for execution.
*
* @return the sub-cluster as identified by {@link SubClusterId} to route the
* request to.
* @param blackListSubClusters the list of subClusters as identified by
* {@link SubClusterId} to blackList from the selection of the home
* subCluster.
*
* @return the {@link SubClusterId} that will be the "home" for this
* application.
*
* @throws YarnException if the policy cannot determine a viable subcluster.
*/
SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext) throws YarnException;
ApplicationSubmissionContext appSubmissionContext,
List<SubClusterId> blackListSubClusters) throws YarnException;
}

View File

@ -55,19 +55,35 @@ public class HashBasedRouterPolicy extends AbstractRouterPolicy {
* sub-cluster, as far as the number of active sub-cluster and their names
* remain the same.
*
* @param appSubmissionContext the context for the app being submitted.
* @param appSubmissionContext the {@link ApplicationSubmissionContext} that
* has to be routed to an appropriate subCluster for execution.
*
* @return a hash-based chosen subcluster.
* @param blackListSubClusters the list of subClusters as identified by
* {@link SubClusterId} to blackList from the selection of the home
* subCluster.
*
* @return a hash-based chosen {@link SubClusterId} that will be the "home"
* for this application.
*
* @throws YarnException if there are no active subclusters.
*/
@Override
public SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext) throws YarnException {
ApplicationSubmissionContext appSubmissionContext,
List<SubClusterId> blackListSubClusters) throws YarnException {
// throws if no active subclusters available
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();
if (blackListSubClusters != null) {
// Remove from the active SubClusters from StateStore the blacklisted ones
for (SubClusterId scId : blackListSubClusters) {
activeSubclusters.remove(scId);
}
}
validate(appSubmissionContext);
int chosenPosition = Math.abs(

View File

@ -17,6 +17,7 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -62,7 +63,8 @@ public class LoadBasedRouterPolicy extends AbstractRouterPolicy {
@Override
public SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext) throws YarnException {
ApplicationSubmissionContext appSubmissionContext,
List<SubClusterId> blacklist) throws YarnException {
// null checks and default-queue behavior
validate(appSubmissionContext);
@ -76,6 +78,9 @@ public class LoadBasedRouterPolicy extends AbstractRouterPolicy {
long currBestMem = -1;
for (Map.Entry<SubClusterId, SubClusterInfo> entry : activeSubclusters
.entrySet()) {
if (blacklist != null && blacklist.contains(entry.getKey())) {
continue;
}
SubClusterIdInfo id = new SubClusterIdInfo(entry.getKey());
if (weights.containsKey(id) && weights.get(id) > 0) {
long availableMemory = getAvailableMemory(entry.getValue());

View File

@ -17,6 +17,7 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -34,7 +35,8 @@ public class PriorityRouterPolicy extends AbstractRouterPolicy {
@Override
public SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext) throws YarnException {
ApplicationSubmissionContext appSubmissionContext,
List<SubClusterId> blacklist) throws YarnException {
// null checks and default-queue behavior
validate(appSubmissionContext);
@ -50,6 +52,9 @@ public class PriorityRouterPolicy extends AbstractRouterPolicy {
Float currentBest = Float.MIN_VALUE;
for (SubClusterId id : activeSubclusters.keySet()) {
SubClusterIdInfo idInfo = new SubClusterIdInfo(id);
if (blacklist != null && blacklist.contains(id)) {
continue;
}
if (weights.containsKey(idInfo) && weights.get(idInfo) > currentBest) {
currentBest = weights.get(idInfo);
chosen = id;

View File

@ -17,6 +17,8 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
@ -27,8 +29,8 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
/**
* This {@link FederationRouterPolicy} simply rejects all incoming requests.
* This is useful to prevent applications running in a queue to be run
* anywhere in the federated cluster.
* This is useful to prevent applications running in a queue to be run anywhere
* in the federated cluster.
*/
public class RejectRouterPolicy extends AbstractRouterPolicy {
@ -44,23 +46,31 @@ public class RejectRouterPolicy extends AbstractRouterPolicy {
/**
* The policy always reject requests.
*
* @param appSubmissionContext the context for the app being submitted.
* @param appSubmissionContext the {@link ApplicationSubmissionContext} that
* has to be routed to an appropriate subCluster for execution.
*
* @param blackListSubClusters the list of subClusters as identified by
* {@link SubClusterId} to blackList from the selection of the home
* subCluster.
*
* @return (never).
*
* @throws YarnException (always) to prevent applications in this queue to
* be run anywhere in the federated cluster.
* @throws YarnException (always) to prevent applications in this queue to be
* run anywhere in the federated cluster.
*/
@Override
public SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext) throws YarnException {
ApplicationSubmissionContext appSubmissionContext,
List<SubClusterId> blackListSubClusters) throws YarnException {
// run standard validation, as error might differ
validate(appSubmissionContext);
throw new FederationPolicyException("The policy configured for this queue"
+ " (" + appSubmissionContext.getQueue() + ") reject all routing "
+ "requests by construction. Application " + appSubmissionContext
.getApplicationId() + " cannot be routed to any RM.");
+ "requests by construction. Application "
+ appSubmissionContext.getApplicationId()
+ " cannot be routed to any RM.");
}
}

View File

@ -59,18 +59,24 @@ public class UniformRandomRouterPolicy extends AbstractRouterPolicy {
}
/**
* Simply picks a random active subcluster to start the AM (this does NOT
* Simply picks a random active subCluster to start the AM (this does NOT
* depend on the weights in the policy).
*
* @param appSubmissionContext the context for the app being submitted
* (ignored).
* @param appSubmissionContext the {@link ApplicationSubmissionContext} that
* has to be routed to an appropriate subCluster for execution.
*
* @param blackListSubClusters the list of subClusters as identified by
* {@link SubClusterId} to blackList from the selection of the home
* subCluster.
*
* @return a randomly chosen subcluster.
*
* @throws YarnException if there are no active subclusters.
*/
@Override
public SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext) throws YarnException {
ApplicationSubmissionContext appSubmissionContext,
List<SubClusterId> blackListSubClusters) throws YarnException {
// null checks and default-queue behavior
validate(appSubmissionContext);
@ -79,6 +85,15 @@ public class UniformRandomRouterPolicy extends AbstractRouterPolicy {
getActiveSubclusters();
List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
if (blackListSubClusters != null) {
// Remove from the active SubClusters from StateStore the blacklisted ones
for (SubClusterId scId : blackListSubClusters) {
list.remove(scId);
}
}
return list.get(rand.nextInt(list.size()));
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
import java.util.List;
import java.util.Map;
import java.util.Random;
@ -41,7 +42,8 @@ public class WeightedRandomRouterPolicy extends AbstractRouterPolicy {
@Override
public SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext) throws YarnException {
ApplicationSubmissionContext appSubmissionContext,
List<SubClusterId> blacklist) throws YarnException {
// null checks and default-queue behavior
validate(appSubmissionContext);
@ -58,6 +60,9 @@ public class WeightedRandomRouterPolicy extends AbstractRouterPolicy {
float totActiveWeight = 0;
for (Map.Entry<SubClusterIdInfo, Float> entry : weights.entrySet()) {
if (blacklist != null && blacklist.contains(entry.getKey().toId())) {
continue;
}
if (entry.getKey() != null
&& activeSubclusters.containsKey(entry.getKey().toId())) {
totActiveWeight += entry.getValue();
@ -66,6 +71,9 @@ public class WeightedRandomRouterPolicy extends AbstractRouterPolicy {
float lookupValue = rand.nextFloat() * totActiveWeight;
for (SubClusterId id : activeSubclusters.keySet()) {
if (blacklist != null && blacklist.contains(id)) {
continue;
}
SubClusterIdInfo idInfo = new SubClusterIdInfo(id);
if (weights.containsKey(idInfo)) {
lookupValue -= weights.get(idInfo);
@ -77,4 +85,5 @@ public class WeightedRandomRouterPolicy extends AbstractRouterPolicy {
// should never happen
return null;
}
}

View File

@ -104,7 +104,7 @@ public abstract class BaseFederationPoliciesTest {
ConfigurableFederationPolicy localPolicy = getPolicy();
if (localPolicy instanceof FederationRouterPolicy) {
((FederationRouterPolicy) localPolicy)
.getHomeSubcluster(getApplicationSubmissionContext());
.getHomeSubcluster(getApplicationSubmissionContext(), null);
} else {
String[] hosts = new String[] {"host1", "host2"};
List<ResourceRequest> resourceRequests = FederationPoliciesTestUtil

View File

@ -95,7 +95,7 @@ public class TestRouterPolicyFacade {
// first call runs using standard UniformRandomRouterPolicy
SubClusterId chosen =
routerFacade.getHomeSubcluster(applicationSubmissionContext);
routerFacade.getHomeSubcluster(applicationSubmissionContext, null);
Assert.assertTrue(subClusterIds.contains(chosen));
Assert.assertTrue(routerFacade.globalPolicyMap
.get(queue1) instanceof UniformRandomRouterPolicy);
@ -107,7 +107,7 @@ public class TestRouterPolicyFacade {
.newInstance(getPriorityPolicy(queue1)));
// second call is routed by new policy PriorityRouterPolicy
chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext);
chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext, null);
Assert.assertTrue(chosen.equals(subClusterIds.get(0)));
Assert.assertTrue(routerFacade.globalPolicyMap
.get(queue1) instanceof PriorityRouterPolicy);
@ -126,7 +126,7 @@ public class TestRouterPolicyFacade {
// when invoked it returns the expected SubClusterId.
SubClusterId chosen =
routerFacade.getHomeSubcluster(applicationSubmissionContext);
routerFacade.getHomeSubcluster(applicationSubmissionContext, null);
Assert.assertTrue(subClusterIds.contains(chosen));
// now the caching of policies must have added an entry for this queue
@ -160,19 +160,19 @@ public class TestRouterPolicyFacade {
String uninitQueue = "non-initialized-queue";
when(applicationSubmissionContext.getQueue()).thenReturn(uninitQueue);
SubClusterId chosen =
routerFacade.getHomeSubcluster(applicationSubmissionContext);
routerFacade.getHomeSubcluster(applicationSubmissionContext, null);
Assert.assertTrue(subClusterIds.contains(chosen));
Assert.assertFalse(routerFacade.globalPolicyMap.containsKey(uninitQueue));
// empty string
when(applicationSubmissionContext.getQueue()).thenReturn("");
chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext);
chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext, null);
Assert.assertTrue(subClusterIds.contains(chosen));
Assert.assertFalse(routerFacade.globalPolicyMap.containsKey(uninitQueue));
// null queue also falls back to default
when(applicationSubmissionContext.getQueue()).thenReturn(null);
chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext);
chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext, null);
Assert.assertTrue(subClusterIds.contains(chosen));
Assert.assertFalse(routerFacade.globalPolicyMap.containsKey(uninitQueue));

View File

@ -18,11 +18,19 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Test;
@ -40,12 +48,43 @@ public abstract class BaseRouterPoliciesTest
ApplicationSubmissionContext.newInstance(null, null, null, null, null,
false, false, 0, Resources.none(), null, false, null, null);
SubClusterId chosen =
localPolicy.getHomeSubcluster(applicationSubmissionContext);
localPolicy.getHomeSubcluster(applicationSubmissionContext, null);
Assert.assertNotNull(chosen);
}
@Test(expected = FederationPolicyException.class)
public void testNullAppContext() throws YarnException {
((FederationRouterPolicy) getPolicy()).getHomeSubcluster(null);
((FederationRouterPolicy) getPolicy()).getHomeSubcluster(null, null);
}
@Test
public void testBlacklistSubcluster() throws YarnException {
FederationRouterPolicy localPolicy = (FederationRouterPolicy) getPolicy();
ApplicationSubmissionContext applicationSubmissionContext =
ApplicationSubmissionContext.newInstance(null, null, null, null, null,
false, false, 0, Resources.none(), null, false, null, null);
Map<SubClusterId, SubClusterInfo> activeSubClusters =
getActiveSubclusters();
if (activeSubClusters != null && activeSubClusters.size() > 1
&& !(localPolicy instanceof RejectRouterPolicy)) {
// blacklist all the active subcluster but one.
Random random = new Random();
List<SubClusterId> blacklistSubclusters =
new ArrayList<SubClusterId>(activeSubClusters.keySet());
SubClusterId removed = blacklistSubclusters
.remove(random.nextInt(blacklistSubclusters.size()));
// bias LoadBasedRouterPolicy
getPolicyInfo().getRouterPolicyWeights()
.put(new SubClusterIdInfo(removed), 1.0f);
FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
getPolicyInfo(), getActiveSubclusters());
SubClusterId chosen = localPolicy.getHomeSubcluster(
applicationSubmissionContext, blacklistSubclusters);
// check that the selected sub-cluster is only one not blacklisted
Assert.assertNotNull(chosen);
Assert.assertEquals(removed, chosen);
}
}
}

View File

@ -70,7 +70,7 @@ public class TestHashBasedRouterPolicy extends BaseRouterPoliciesTest {
for (int i = 0; i < jobPerSub * numSubclusters; i++) {
when(applicationSubmissionContext.getQueue()).thenReturn("queue" + i);
chosen = ((FederationRouterPolicy) getPolicy())
.getHomeSubcluster(applicationSubmissionContext);
.getHomeSubcluster(applicationSubmissionContext, null);
counter.get(chosen).addAndGet(1);
}

View File

@ -97,7 +97,7 @@ public class TestLoadBasedRouterPolicy extends BaseRouterPoliciesTest {
public void testLoadIsRespected() throws YarnException {
SubClusterId chosen = ((FederationRouterPolicy) getPolicy())
.getHomeSubcluster(getApplicationSubmissionContext());
.getHomeSubcluster(getApplicationSubmissionContext(), null);
// check the "planted" best cluster is chosen
Assert.assertEquals("sc05", chosen.getId());

View File

@ -78,7 +78,7 @@ public class TestPriorityRouterPolicy extends BaseRouterPoliciesTest {
@Test
public void testPickLowestWeight() throws YarnException {
SubClusterId chosen = ((FederationRouterPolicy) getPolicy())
.getHomeSubcluster(getApplicationSubmissionContext());
.getHomeSubcluster(getApplicationSubmissionContext(), null);
Assert.assertEquals("sc5", chosen.getId());
}

View File

@ -47,7 +47,7 @@ public class TestRejectRouterPolicy extends BaseRouterPoliciesTest {
@Test(expected = FederationPolicyException.class)
public void testNoClusterIsChosen() throws YarnException {
((FederationRouterPolicy) getPolicy())
.getHomeSubcluster(getApplicationSubmissionContext());
.getHomeSubcluster(getApplicationSubmissionContext(), null);
}
@Override
@ -57,7 +57,7 @@ public class TestRejectRouterPolicy extends BaseRouterPoliciesTest {
ApplicationSubmissionContext applicationSubmissionContext =
ApplicationSubmissionContext.newInstance(null, null, null, null, null,
false, false, 0, Resources.none(), null, false, null, null);
localPolicy.getHomeSubcluster(applicationSubmissionContext);
localPolicy.getHomeSubcluster(applicationSubmissionContext, null);
}
}

View File

@ -57,7 +57,7 @@ public class TestUniformRandomRouterPolicy extends BaseRouterPoliciesTest {
@Test
public void testOneSubclusterIsChosen() throws YarnException {
SubClusterId chosen = ((FederationRouterPolicy) getPolicy())
.getHomeSubcluster(getApplicationSubmissionContext());
.getHomeSubcluster(getApplicationSubmissionContext(), null);
Assert.assertTrue(getActiveSubclusters().keySet().contains(chosen));
}

View File

@ -98,7 +98,7 @@ public class TestWeightedRandomRouterPolicy extends BaseRouterPoliciesTest {
for (float i = 0; i < numberOfDraws; i++) {
SubClusterId chosenId = ((FederationRouterPolicy) getPolicy())
.getHomeSubcluster(getApplicationSubmissionContext());
.getHomeSubcluster(getApplicationSubmissionContext(), null);
counter.get(chosenId).incrementAndGet();
}