diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java index 1ccd61cae4d..e5f26d8f905 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java @@ -21,8 +21,11 @@ package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; @@ -123,6 +126,8 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { public static final Logger LOG = LoggerFactory.getLogger(LocalityMulticastAMRMProxyPolicy.class); + private static Random rand = new Random(); + private Map weights; private SubClusterResolver resolver; @@ -275,26 +280,18 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { } // Handle node/rack requests that the SubClusterResolver cannot map to - // any cluster. Defaulting to home subcluster. + // any cluster. Pick a random sub-cluster from active and enabled ones. + targetId = getSubClusterForUnResolvedRequest(bookkeeper, + rr.getAllocationRequestId()); if (LOG.isDebugEnabled()) { LOG.debug("ERROR resolving sub-cluster for resourceName: " - + rr.getResourceName() + " we are falling back to homeSubCluster:" - + homeSubcluster); + + rr.getResourceName() + ", picked a random subcluster to forward:" + + targetId); } - - // If home-subcluster is not active, ignore node/rack request - if (bookkeeper.isActiveAndEnabled(homeSubcluster)) { - if (targetIds != null && targetIds.size() > 0) { - bookkeeper.addRackRR(homeSubcluster, rr); - } else { - bookkeeper.addLocalizedNodeRR(homeSubcluster, rr); - } + if (targetIds != null && targetIds.size() > 0) { + bookkeeper.addRackRR(targetId, rr); } else { - if (LOG.isDebugEnabled()) { - LOG.debug("The homeSubCluster (" + homeSubcluster + ") we are " - + "defaulting to is not active, the ResourceRequest " - + "will be ignored."); - } + bookkeeper.addLocalizedNodeRR(targetId, rr); } } @@ -313,6 +310,14 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { return bookkeeper.getAnswer(); } + /** + * For unit test to override. + */ + protected SubClusterId getSubClusterForUnResolvedRequest( + AllocationBookkeeper bookKeeper, long allocationId) { + return bookKeeper.getSubClusterForUnResolvedRequest(allocationId); + } + /** * It splits a list of non-localized resource requests among sub-clusters. */ @@ -512,10 +517,11 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { * This helper class is used to book-keep the requests made to each * subcluster, and maintain useful statistics to split ANY requests. */ - private final class AllocationBookkeeper { + protected final class AllocationBookkeeper { // the answer being accumulated private Map> answer = new TreeMap<>(); + private Map> maskForRackDeletion = new HashMap<>(); // stores how many containers we have allocated in each RM for localized // asks, used to correctly "spread" the corresponding ANY @@ -523,6 +529,10 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { new HashMap<>(); private Map totNumLocalizedContainers = new HashMap<>(); + // Store the randomly selected subClusterId for unresolved resource requests + // keyed by requestId + private Map unResolvedRequestLocation = new HashMap<>(); + private Set activeAndEnabledSC = new HashSet<>(); private float totHeadroomMemory = 0; private int totHeadRoomEnabledRMs = 0; @@ -538,6 +548,7 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { // reset data structures answer.clear(); + maskForRackDeletion.clear(); countContainersPerRM.clear(); totNumLocalizedContainers.clear(); activeAndEnabledSC.clear(); @@ -628,16 +639,16 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { .addAndGet(rr.getNumContainers()); } - internalAddToAnswer(targetId, rr); + internalAddToAnswer(targetId, rr, false); } /** * Add a rack-local request to the final asnwer. */ - public void addRackRR(SubClusterId targetId, ResourceRequest rr) { + private void addRackRR(SubClusterId targetId, ResourceRequest rr) { Preconditions .checkArgument(!ResourceRequest.isAnyLocation(rr.getResourceName())); - internalAddToAnswer(targetId, rr); + internalAddToAnswer(targetId, rr, true); } /** @@ -646,17 +657,45 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { private void addAnyRR(SubClusterId targetId, ResourceRequest rr) { Preconditions .checkArgument(ResourceRequest.isAnyLocation(rr.getResourceName())); - internalAddToAnswer(targetId, rr); + internalAddToAnswer(targetId, rr, false); } private void internalAddToAnswer(SubClusterId targetId, - ResourceRequest partialRR) { + ResourceRequest partialRR, boolean isRack) { + if (!isRack) { + if (!maskForRackDeletion.containsKey(targetId)) { + maskForRackDeletion.put(targetId, new HashSet()); + } + maskForRackDeletion.get(targetId) + .add(partialRR.getAllocationRequestId()); + } if (!answer.containsKey(targetId)) { answer.put(targetId, new ArrayList()); } answer.get(targetId).add(partialRR); } + /** + * For requests whose location cannot be resolved, choose an active and + * enabled sub-cluster to forward this requestId to. + */ + private SubClusterId getSubClusterForUnResolvedRequest(long allocationId) { + if (unResolvedRequestLocation.containsKey(allocationId)) { + return unResolvedRequestLocation.get(allocationId); + } + int id = rand.nextInt(activeAndEnabledSC.size()); + for (SubClusterId subclusterId : activeAndEnabledSC) { + if (id == 0) { + unResolvedRequestLocation.put(allocationId, subclusterId); + return subclusterId; + } + id--; + } + throw new RuntimeException( + "Should not be here. activeAndEnabledSC size = " + + activeAndEnabledSC.size() + " id = " + id); + } + /** * Return all known subclusters associated with an allocation id. * @@ -678,6 +717,28 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { * @return the answer */ private Map> getAnswer() { + Iterator>> answerIter = + answer.entrySet().iterator(); + // Remove redundant rack RR before returning the answer + while (answerIter.hasNext()) { + Entry> entry = answerIter.next(); + SubClusterId scId = entry.getKey(); + Set mask = maskForRackDeletion.get(scId); + if (mask != null) { + Iterator rrIter = entry.getValue().iterator(); + while (rrIter.hasNext()) { + ResourceRequest rr = rrIter.next(); + if (!mask.contains(rr.getAllocationRequestId())) { + rrIter.remove(); + } + } + } + if (mask == null || entry.getValue().size() == 0) { + answerIter.remove(); + LOG.info("removing {} from output because it has only rack RR", + scId); + } + } return answer; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java index cf9ac5347cd..c49ab60a8d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java @@ -69,12 +69,12 @@ public class TestLocalityMulticastAMRMProxyPolicy @Before public void setUp() throws Exception { - setPolicy(new LocalityMulticastAMRMProxyPolicy()); + setPolicy(new TestableLocalityMulticastAMRMProxyPolicy()); setPolicyInfo(new WeightedPolicyInfo()); Map routerWeights = new HashMap<>(); Map amrmWeights = new HashMap<>(); - // simulate 20 subclusters with a 5% chance of being inactive + // Six sub-clusters with one inactive and one disabled for (int i = 0; i < 6; i++) { SubClusterIdInfo sc = new SubClusterIdInfo("subcluster" + i); // sub-cluster 3 is not active @@ -207,6 +207,7 @@ public class TestLocalityMulticastAMRMProxyPolicy getPolicyInfo().setHeadroomAlpha(1.0f); initializePolicy(); + addHomeSubClusterAsActive(); int numRR = 1000; List resourceRequests = createLargeRandomList(numRR); @@ -324,14 +325,11 @@ public class TestLocalityMulticastAMRMProxyPolicy null, Collections. emptyList()); } - @Test - public void testSplitAllocateRequest() throws Exception { - - // Test a complex List is split correctly - initializePolicy(); - - // modify default initialization to include a "homesubcluster" - // which we will use as the default for when nodes or racks are unknown + /** + * modify default initialization to include a "homesubcluster" which we will + * use as the default for when nodes or racks are unknown. + */ + private void addHomeSubClusterAsActive() { SubClusterInfo sci = mock(SubClusterInfo.class); when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING); when(sci.getSubClusterId()).thenReturn(getHomeSubCluster()); @@ -340,6 +338,14 @@ public class TestLocalityMulticastAMRMProxyPolicy getPolicyInfo().getRouterPolicyWeights().put(sc, 0.1f); getPolicyInfo().getAMRMPolicyWeights().put(sc, 0.1f); + } + + @Test + public void testSplitAllocateRequest() throws Exception { + + // Test a complex List is split correctly + initializePolicy(); + addHomeSubClusterAsActive(); FederationPoliciesTestUtil.initializePolicyContext( getFederationPolicyContext(), getPolicy(), getPolicyInfo(), @@ -502,7 +508,8 @@ public class TestLocalityMulticastAMRMProxyPolicy // Test target Ids for (SubClusterId targetId : split.keySet()) { - Assert.assertTrue("Target subclusters should be in the active set", + Assert.assertTrue( + "Target subcluster " + targetId + " should be in the active set", getActiveSubclusters().containsKey(targetId)); Assert.assertTrue( "Target subclusters (" + targetId + ") should have weight >0 in " @@ -787,4 +794,28 @@ public class TestLocalityMulticastAMRMProxyPolicy checkTotalContainerAllocation(response, 100); } + /** + * A testable version of LocalityMulticastAMRMProxyPolicy that + * deterministically falls back to home sub-cluster for unresolved requests. + */ + private class TestableLocalityMulticastAMRMProxyPolicy + extends LocalityMulticastAMRMProxyPolicy { + @Override + protected SubClusterId getSubClusterForUnResolvedRequest( + AllocationBookkeeper bookkeeper, long allocationId) { + SubClusterId originalResult = + super.getSubClusterForUnResolvedRequest(bookkeeper, allocationId); + Map activeClusters = null; + try { + activeClusters = getActiveSubclusters(); + } catch (YarnException e) { + throw new RuntimeException(e); + } + // The randomly selected sub-cluster should at least be active + Assert.assertTrue(activeClusters.containsKey(originalResult)); + + // Alwasy use home sub-cluster so that unit test is deterministic + return getHomeSubCluster(); + } + } } \ No newline at end of file