From b5ec85d96615e8214c14b57f8980a1dee6197ffa Mon Sep 17 00:00:00 2001 From: Botong Huang Date: Sun, 11 Nov 2018 11:12:53 -0800 Subject: [PATCH] YARN-8933. [AMRMProxy] Fix potential empty fields in allocation response, move SubClusterTimeout to FederationInterceptor. Contributed by Botong Huang. --- .../amrmproxy/BroadcastAMRMProxyPolicy.java | 4 +- .../amrmproxy/FederationAMRMProxyPolicy.java | 8 +- .../amrmproxy/HomeAMRMProxyPolicy.java | 5 +- .../LocalityMulticastAMRMProxyPolicy.java | 52 ++----- .../amrmproxy/RejectAMRMProxyPolicy.java | 4 +- .../policies/BaseFederationPoliciesTest.java | 5 +- ...estBroadcastAMRMProxyFederationPolicy.java | 10 +- .../amrmproxy/TestHomeAMRMProxyPolicy.java | 10 +- .../TestLocalityMulticastAMRMProxyPolicy.java | 41 +++--- .../amrmproxy/TestRejectAMRMProxyPolicy.java | 5 +- .../amrmproxy/FederationInterceptor.java | 129 ++++++++++++++++-- .../amrmproxy/TestFederationInterceptor.java | 63 ++++++++- 12 files changed, 236 insertions(+), 100 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java index eb83baa8441..643bfa6da01 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -49,7 +50,8 @@ public void reinitialize( @Override public Map> splitResourceRequests( - List resourceRequests) throws YarnException { + List resourceRequests, + Set timedOutSubClusters) throws YarnException { Map activeSubclusters = getActiveSubclusters(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java index 0541df4346c..3d39d7280d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java @@ -19,6 +19,7 @@ import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -39,15 +40,16 @@ public interface FederationAMRMProxyPolicy * * @param resourceRequests the list of {@link ResourceRequest}s from the AM to * be split - * + * @param timedOutSubClusters the set of sub-clusters that haven't had a + * successful heart-beat response for a while. * @return map of sub-cluster as identified by {@link SubClusterId} to the * list of {@link ResourceRequest}s that should be forwarded to it - * * @throws YarnException in case the request is malformed or no viable * sub-clusters can be found. */ Map> splitResourceRequests( - List resourceRequests) throws YarnException; + List resourceRequests, + Set timedOutSubClusters) throws YarnException; /** * This method should be invoked to notify the policy about responses being diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/HomeAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/HomeAMRMProxyPolicy.java index 5dd5c531889..acb7e0af183 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/HomeAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/HomeAMRMProxyPolicy.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -55,8 +56,8 @@ public void reinitialize( @Override public Map> splitResourceRequests( - List resourceRequests) throws YarnException { - + List resourceRequests, + Set timedOutSubClusters) throws YarnException { if (homeSubcluster == null) { throw new FederationPolicyException("No home subcluster available"); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java index e5f26d8f905..47d23e07538 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java @@ -34,7 +34,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; @@ -132,11 +131,8 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { private SubClusterResolver resolver; private Map headroom; - private Map lastHeartbeatTimeStamp; - private long subClusterTimeOut; private float hrAlpha; private FederationStateStoreFacade federationFacade; - private AllocationBookkeeper bookkeeper; private SubClusterId homeSubcluster; @Override @@ -186,26 +182,12 @@ public void reinitialize( if (headroom == null) { headroom = new ConcurrentHashMap<>(); - lastHeartbeatTimeStamp = new ConcurrentHashMap<>(); } hrAlpha = policy.getHeadroomAlpha(); this.federationFacade = policyContext.getFederationStateStoreFacade(); this.homeSubcluster = policyContext.getHomeSubcluster(); - - this.subClusterTimeOut = this.federationFacade.getConf().getLong( - YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT, - YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT); - if (this.subClusterTimeOut <= 0) { - LOG.info( - "{} configured to be {}, should be positive. Using default of {}.", - YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT, - this.subClusterTimeOut, - YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT); - this.subClusterTimeOut = - YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT; - } } @Override @@ -216,18 +198,18 @@ public void notifyOfResponse(SubClusterId subClusterId, LOG.info("Subcluster {} updated with {} memory headroom", subClusterId, response.getAvailableResources().getMemorySize()); } - lastHeartbeatTimeStamp.put(subClusterId, System.currentTimeMillis()); } @Override public Map> splitResourceRequests( - List resourceRequests) throws YarnException { + List resourceRequests, + Set timedOutSubClusters) throws YarnException { // object used to accumulate statistics about the answer, initialize with // active subclusters. Create a new instance per call because this method // can be called concurrently. - bookkeeper = new AllocationBookkeeper(); - bookkeeper.reinitialize(federationFacade.getSubClusters(true)); + AllocationBookkeeper bookkeeper = new AllocationBookkeeper(); + bookkeeper.reinitialize(getActiveSubclusters(), timedOutSubClusters); List nonLocalizedRequests = new ArrayList(); @@ -298,15 +280,6 @@ public Map> splitResourceRequests( // handle all non-localized requests (ANY) splitAnyRequests(nonLocalizedRequests, bookkeeper); - for (Map.Entry> entry : bookkeeper - .getAnswer().entrySet()) { - // A new-cluster here will trigger new UAM luanch, which might take a long - // time. We don't want too many requests stuck in this UAM before it is - // ready and starts heartbeating - if (!lastHeartbeatTimeStamp.containsKey(entry.getKey())) { - lastHeartbeatTimeStamp.put(entry.getKey(), System.currentTimeMillis()); - } - } return bookkeeper.getAnswer(); } @@ -540,8 +513,8 @@ protected final class AllocationBookkeeper { private float totPolicyWeight = 0; private void reinitialize( - Map activeSubclusters) - throws YarnException { + Map activeSubclusters, + Set timedOutSubClusters) throws YarnException { if (activeSubclusters == null) { throw new YarnRuntimeException("null activeSubclusters received"); } @@ -573,17 +546,8 @@ private void reinitialize( } Set tmpSCSet = new HashSet<>(activeAndEnabledSC); - for (Map.Entry entry : lastHeartbeatTimeStamp - .entrySet()) { - long duration = System.currentTimeMillis() - entry.getValue(); - if (duration > subClusterTimeOut) { - LOG.warn( - "Subcluster {} does not have a success heartbeat for {}s, " - + "skip routing asks there for this request", - entry.getKey(), (double) duration / 1000); - tmpSCSet.remove(entry.getKey()); - } - } + tmpSCSet.removeAll(timedOutSubClusters); + if (tmpSCSet.size() < 1) { LOG.warn("All active and enabled subclusters have expired last " + "heartbeat time. Ignore the expiry check for this request"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/RejectAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/RejectAMRMProxyPolicy.java index bed037e463d..a21234e4a55 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/RejectAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/RejectAMRMProxyPolicy.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -47,7 +48,8 @@ public void reinitialize(FederationPolicyInitializationContext policyContext) @Override public Map> splitResourceRequests( - List resourceRequests) throws YarnException { + List resourceRequests, + Set timedOutSubClusters) throws YarnException { throw new FederationPolicyException("The policy configured for this queue " + "rejects all routing requests by construction."); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java index 23978ed886e..57d3c67ec0a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Random; @@ -109,8 +110,8 @@ public void testNoSubclusters() throws YarnException { String[] hosts = new String[] {"host1", "host2"}; List resourceRequests = FederationPoliciesTestUtil .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false); - ((FederationAMRMProxyPolicy) localPolicy) - .splitResourceRequests(resourceRequests); + ((FederationAMRMProxyPolicy) localPolicy).splitResourceRequests( + resourceRequests, new HashSet()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java index df5da85a915..52f36a40baf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java @@ -21,6 +21,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.HashSet; import java.util.List; import java.util.Map; @@ -28,7 +29,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; -import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; @@ -71,8 +71,8 @@ public void testSplitAllocateRequest() throws Exception { .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false); Map> response = - ((FederationAMRMProxyPolicy) getPolicy()) - .splitResourceRequests(resourceRequests); + ((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests( + resourceRequests, new HashSet()); Assert.assertTrue(response.size() == 2); for (Map.Entry> entry : response .entrySet()) { @@ -94,8 +94,8 @@ public void testNotifyOfResponseFromUnknownSubCluster() throws Exception { List resourceRequests = FederationPoliciesTestUtil .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false); Map> response = - ((FederationAMRMProxyPolicy) getPolicy()) - .splitResourceRequests(resourceRequests); + ((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests( + resourceRequests, new HashSet()); ((FederationAMRMProxyPolicy) getPolicy()).notifyOfResponse( SubClusterId.newInstance("sc3"), mock(AllocateResponse.class)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestHomeAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestHomeAMRMProxyPolicy.java index 90a6aeb5951..1f57c1fe5d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestHomeAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestHomeAMRMProxyPolicy.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.HashSet; import java.util.List; import java.util.Map; @@ -80,9 +81,9 @@ public void testSplitAllocateRequest() throws YarnException { hosts, 2 * 1024, 2, 1, 3, null, false); HomeAMRMProxyPolicy federationPolicy = - (HomeAMRMProxyPolicy)getPolicy(); - Map> response = - federationPolicy.splitResourceRequests(resourceRequests); + (HomeAMRMProxyPolicy) getPolicy(); + Map> response = federationPolicy + .splitResourceRequests(resourceRequests, new HashSet()); assertEquals(1, response.size()); assertNotNull(response.get(HOME_SC_ID)); assertEquals(9, response.get(HOME_SC_ID).size()); @@ -101,7 +102,8 @@ public void testHomeSubclusterNotActive() throws YarnException { List resourceRequests = createResourceRequests( hosts, 2 * 1024, 2, 1, 3, null, false); HomeAMRMProxyPolicy federationPolicy = (HomeAMRMProxyPolicy)getPolicy(); - federationPolicy.splitResourceRequests(resourceRequests); + federationPolicy.splitResourceRequests(resourceRequests, + new HashSet()); fail("It should fail when the home subcluster is not active"); } catch(FederationPolicyException e) { GenericTestUtils.assertExceptionContains("is not active", e); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java index c49ab60a8d8..10359e44458 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java @@ -154,8 +154,8 @@ public void testSplitBasedOnHeadroom() throws Exception { prepPolicyWithHeadroom(true); Map> response = - ((FederationAMRMProxyPolicy) getPolicy()) - .splitResourceRequests(resourceRequests); + ((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests( + resourceRequests, new HashSet()); // pretty print requests LOG.info("Initial headroom"); @@ -180,7 +180,7 @@ public void testSplitBasedOnHeadroom() throws Exception { ((FederationAMRMProxyPolicy) getPolicy()) .notifyOfResponse(SubClusterId.newInstance("subcluster2"), ar); response = ((FederationAMRMProxyPolicy) getPolicy()) - .splitResourceRequests(resourceRequests); + .splitResourceRequests(resourceRequests, new HashSet()); LOG.info("After headroom update"); prettyPrintRequests(response); @@ -218,8 +218,8 @@ public void testStressPolicy() throws Exception { long tstart = System.currentTimeMillis(); for (int i = 0; i < numIterations; i++) { Map> response = - ((FederationAMRMProxyPolicy) getPolicy()) - .splitResourceRequests(resourceRequests); + ((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests( + resourceRequests, new HashSet()); validateSplit(response, resourceRequests); } long tend = System.currentTimeMillis(); @@ -243,8 +243,8 @@ public void testFWDAllZeroANY() throws Exception { prepPolicyWithHeadroom(true); Map> response = - ((FederationAMRMProxyPolicy) getPolicy()) - .splitResourceRequests(resourceRequests); + ((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests( + resourceRequests, new HashSet()); // we expect all three to appear for a zero-sized ANY @@ -279,8 +279,8 @@ public void testSplitBasedOnHeadroomAndWeights() throws Exception { prepPolicyWithHeadroom(true); Map> response = - ((FederationAMRMProxyPolicy) getPolicy()) - .splitResourceRequests(resourceRequests); + ((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests( + resourceRequests, new HashSet()); // pretty print requests prettyPrintRequests(response); @@ -354,8 +354,8 @@ public void testSplitAllocateRequest() throws Exception { List resourceRequests = createComplexRequest(); Map> response = - ((FederationAMRMProxyPolicy) getPolicy()) - .splitResourceRequests(resourceRequests); + ((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests( + resourceRequests, new HashSet()); validateSplit(response, resourceRequests); prettyPrintRequests(response); @@ -697,8 +697,8 @@ public void testCancelWithLocalizedResource() throws YarnException { ResourceRequest.ANY, 1024, 1, 1, 0, null, false)); Map> response = - ((FederationAMRMProxyPolicy) getPolicy()) - .splitResourceRequests(resourceRequests); + ((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests( + resourceRequests, new HashSet()); checkExpectedAllocation(response, "subcluster0", 3, 1); checkExpectedAllocation(response, "subcluster1", 1, 0); @@ -717,7 +717,7 @@ public void testCancelWithLocalizedResource() throws YarnException { ResourceRequest.ANY, 1024, 1, 1, 100, null, false)); response = ((FederationAMRMProxyPolicy) getPolicy()) - .splitResourceRequests(resourceRequests); + .splitResourceRequests(resourceRequests, new HashSet()); /* * Since node request is a cancel, it should not be considered associated @@ -750,12 +750,13 @@ public void testSubClusterExpiry() throws Exception { initializePolicy(conf); List resourceRequests = createSimpleRequest(); - // Update the response timestamp for the first time prepPolicyWithHeadroom(true); + // For first time, no sub-cluster expired + Set expiredSCList = new HashSet<>(); Map> response = ((FederationAMRMProxyPolicy) getPolicy()) - .splitResourceRequests(resourceRequests); + .splitResourceRequests(resourceRequests, expiredSCList); // pretty print requests prettyPrintRequests(response); @@ -776,11 +777,11 @@ public void testSubClusterExpiry() throws Exception { Thread.sleep(800); - // Update the response timestamp for the second time, skipping sc0 and sc5 - prepPolicyWithHeadroom(false); - + // For the second time, sc0 and sc5 expired + expiredSCList.add(SubClusterId.newInstance("subcluster0")); + expiredSCList.add(SubClusterId.newInstance("subcluster5")); response = ((FederationAMRMProxyPolicy) getPolicy()) - .splitResourceRequests(resourceRequests); + .splitResourceRequests(resourceRequests, expiredSCList); // pretty print requests prettyPrintRequests(response); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestRejectAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestRejectAMRMProxyPolicy.java index 41e7fed2194..655582941f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestRejectAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestRejectAMRMProxyPolicy.java @@ -21,6 +21,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.HashSet; import java.util.List; import java.util.Map; @@ -69,8 +70,8 @@ public void testSplitAllocateRequest() throws Exception { .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false); Map> response = - ((FederationAMRMProxyPolicy) getPolicy()) - .splitResourceRequests(resourceRequests); + ((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests( + resourceRequests, new HashSet()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index ae9f78df2a0..dc10f957380 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -60,6 +61,7 @@ import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.PreemptionContract; import org.apache.hadoop.yarn.api.records.PreemptionMessage; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.StrictPreemptionContract; @@ -163,10 +165,22 @@ public class FederationInterceptor extends AbstractRequestInterceptor { /** * Stores the AllocateResponses that are received asynchronously from all the - * sub-cluster resource managers, including home RM. + * sub-cluster resource managers, including home RM, but not merged and + * returned back to AM yet. */ private Map> asyncResponseSink; + /** + * Remembers the last allocate response from all known sub-clusters. This is + * used together with sub-cluster timeout to assemble entries about + * cluster-wide info (e.g. AvailableResource, NumClusterNodes) in the allocate + * response back to AM. + */ + private Map lastSCResponse; + + /** + * The async UAM registration result that is not consumed yet. + */ private Map uamRegistrations; // For unit test synchronization @@ -216,6 +230,16 @@ public class FederationInterceptor extends AbstractRequestInterceptor { private SubClusterResolver subClusterResolver; + /** + * Records the last time a successful heartbeat response received from a known + * sub-cluster. lastHeartbeatTimeStamp.keySet() should be in sync with + * uamPool.getAllUAMIds(). + */ + private Map lastSCResponseTime; + private long subClusterTimeOut; + + private long lastAMHeartbeatTime; + /** The policy used to split requests among sub-clusters. */ private FederationAMRMProxyPolicy policyInterpreter; @@ -232,6 +256,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { public FederationInterceptor() { this.containerIdToSubClusterIdMap = new ConcurrentHashMap<>(); this.asyncResponseSink = new ConcurrentHashMap<>(); + this.lastSCResponse = new ConcurrentHashMap<>(); this.uamRegistrations = new ConcurrentHashMap<>(); this.uamRegisterFutures = new ConcurrentHashMap<>(); this.threadpool = Executors.newCachedThreadPool(); @@ -241,6 +266,8 @@ public FederationInterceptor() { this.amRegistrationResponse = null; this.justRecovered = false; this.finishAMCalled = false; + this.lastSCResponseTime = new ConcurrentHashMap<>(); + this.lastAMHeartbeatTime = this.clock.getTime(); } /** @@ -310,6 +337,19 @@ public void init(AMRMProxyApplicationContext appContext) { this.heartbeatMaxWaitTimeMs = conf.getLong(YarnConfiguration.FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS, YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS); + + this.subClusterTimeOut = + conf.getLong(YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT, + YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT); + if (this.subClusterTimeOut <= 0) { + LOG.info( + "{} configured to be {}, should be positive. Using default of {}.", + YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT, + this.subClusterTimeOut, + YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT); + this.subClusterTimeOut = + YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT; + } } @Override @@ -394,6 +434,10 @@ public void recover(Map recoveredDataMap) { this.uamPool.registerApplicationMaster(subClusterId.getId(), this.amRegistrationRequest); + // Set sub-cluster to be timed out initially + lastSCResponseTime.put(subClusterId, + clock.getTime() - subClusterTimeOut); + // Running containers from secondary RMs for (Container container : response .getContainersFromPreviousAttempts()) { @@ -580,6 +624,7 @@ public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException { Preconditions.checkArgument(this.policyInterpreter != null, "Allocate should be called after registerApplicationMaster"); + this.lastAMHeartbeatTime = this.clock.getTime(); if (this.justRecovered) { throw new ApplicationMasterNotRegisteredException( @@ -644,8 +689,7 @@ public AllocateResponse allocate(AllocateRequest request) } // Prepare the response to AM - AllocateResponse response = - RECORD_FACTORY.newRecordInstance(AllocateResponse.class); + AllocateResponse response = generateBaseAllocationResponse(); // Merge all responses from response sink mergeAllocateResponses(response); @@ -970,6 +1014,10 @@ public RegisterApplicationMasterResponse call() throws Exception { response = uamPool.registerApplicationMaster( subClusterId.getId(), amRegistrationRequest); + // Set sub-cluster to be timed out initially + lastSCResponseTime.put(subClusterId, + clock.getTime() - subClusterTimeOut); + if (response != null && response.getContainersFromPreviousAttempts() != null) { cacheAllocatedContainers( @@ -1172,6 +1220,10 @@ private List registerAndAllocateWithNewSubClusters( if (!subClusterId.equals(this.homeSubClusterId) && !this.uamPool.hasUAMId(subClusterId.getId())) { newSubClusters.add(subClusterId); + + // Set sub-cluster to be timed out initially + lastSCResponseTime.put(subClusterId, + clock.getTime() - subClusterTimeOut); } } @@ -1244,6 +1296,38 @@ public void run() { return newSubClusters; } + /** + * Prepare the base allocation response. Use lastSCResponse and + * lastHeartbeatTimeStamp to assemble entries about cluster-wide info, e.g. + * AvailableResource, NumClusterNodes. + */ + protected AllocateResponse generateBaseAllocationResponse() { + AllocateResponse baseResponse = + RECORD_FACTORY.newRecordInstance(AllocateResponse.class); + + baseResponse.setAvailableResources(Resource.newInstance(0, 0)); + baseResponse.setNumClusterNodes(0); + + Set expiredSC = getTimedOutSCs(false); + for (Entry entry : lastSCResponse + .entrySet()) { + if (expiredSC.contains(entry.getKey())) { + // Skip expired sub-clusters + continue; + } + AllocateResponse response = entry.getValue(); + + if (response.getAvailableResources() != null) { + baseResponse.setAvailableResources( + Resources.add(baseResponse.getAvailableResources(), + response.getAvailableResources())); + } + baseResponse.setNumClusterNodes( + baseResponse.getNumClusterNodes() + response.getNumClusterNodes()); + } + return baseResponse; + } + /** * Merge the responses from all sub-clusters that we received asynchronously * and keeps track of the containers received from each sub-cluster resource @@ -1345,17 +1429,6 @@ private void mergeAllocateResponse(AllocateResponse homeResponse, } } - if (otherResponse.getAvailableResources() != null) { - if (homeResponse.getAvailableResources() != null) { - homeResponse.setAvailableResources( - Resources.add(homeResponse.getAvailableResources(), - otherResponse.getAvailableResources())); - } else { - homeResponse - .setAvailableResources(otherResponse.getAvailableResources()); - } - } - if (!isNullOrEmpty(otherResponse.getCompletedContainersStatuses())) { if (!isNullOrEmpty(homeResponse.getCompletedContainersStatuses())) { homeResponse.getCompletedContainersStatuses() @@ -1520,6 +1593,29 @@ private static AllocateRequest createAllocateRequest() { return request; } + protected Set getTimedOutSCs(boolean verbose) { + Set timedOutSCs = new HashSet<>(); + for (Map.Entry entry : this.lastSCResponseTime + .entrySet()) { + if (entry.getValue() > this.lastAMHeartbeatTime) { + // AM haven't heartbeat to us (and thus we to all SCs) for a long time, + // should not consider the SC as timed out + continue; + } + long duration = this.clock.getTime() - entry.getValue(); + if (duration > this.subClusterTimeOut) { + if (verbose) { + LOG.warn( + "Subcluster {} doesn't have a successful heartbeat" + + " for {} seconds for {}", + entry.getKey(), (double) duration / 1000, this.attemptId); + } + timedOutSCs.add(entry.getKey()); + } + } + return timedOutSCs; + } + /** * Check to see if the specified containerId exists in the cache and log an * error if not found. @@ -1553,7 +1649,8 @@ private boolean warnIfNotExists(ContainerId containerId, String actionName) { */ protected Map> splitResourceRequests( List askList) throws YarnException { - return this.policyInterpreter.splitResourceRequests(askList); + return policyInterpreter.splitResourceRequests(askList, + getTimedOutSCs(true)); } @VisibleForTesting @@ -1602,6 +1699,8 @@ public void callback(AllocateResponse response) { // Notify main thread about the response arrival asyncResponseSink.notifyAll(); } + lastSCResponse.put(subClusterId, response); + lastSCResponseTime.put(subClusterId, clock.getTime()); // Notify policy of allocate response try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java index ec75cfd55c1..48b7bf57970 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java @@ -160,6 +160,10 @@ protected YarnConfiguration createConfiguration() { // Disable StateStoreFacade cache conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0); + // Set sub-cluster timeout to 500ms + conf.setLong(YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT, + 500); + return conf; } @@ -568,6 +572,8 @@ public Object run() throws Exception { interceptor.recover(recoveredDataMap); Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + // SC1 should be initialized to be timed out + Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size()); // The first allocate call expects a fail-over exception and re-register try { @@ -740,6 +746,60 @@ public void testAllocateResponse() throws Exception { Assert.assertEquals(1, response.getUpdateErrors().size()); } + @Test + public void testSubClusterTimeOut() throws Exception { + UserGroupInformation ugi = + interceptor.getUGIWithToken(interceptor.getAttemptId()); + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + // Register the application first time + RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(0); + registerReq.setTrackingUrl(""); + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + Assert.assertNotNull(registerResponse); + lastResponseId = 0; + + registerSubCluster(SubClusterId.newInstance("SC-1")); + + getContainersAndAssert(1, 1); + + AllocateResponse allocateResponse = + interceptor.generateBaseAllocationResponse(); + Assert.assertEquals(2, allocateResponse.getNumClusterNodes()); + Assert.assertEquals(0, interceptor.getTimedOutSCs(true).size()); + + // Let all SC timeout (home and SC-1), without an allocate from AM + Thread.sleep(800); + + // Should not be considered timeout, because there's no recent AM + // heartbeat + allocateResponse = interceptor.generateBaseAllocationResponse(); + Assert.assertEquals(2, allocateResponse.getNumClusterNodes()); + Assert.assertEquals(0, interceptor.getTimedOutSCs(true).size()); + + // Generate a duplicate heartbeat from AM, so that it won't really + // trigger an heartbeat to all SC + AllocateRequest allocateRequest = + Records.newRecord(AllocateRequest.class); + // Set to lastResponseId - 1 so that it will be considered a duplicate + // heartbeat and thus not forwarded to all SCs + allocateRequest.setResponseId(lastResponseId - 1); + interceptor.allocate(allocateRequest); + + // Should be considered timeout + allocateResponse = interceptor.generateBaseAllocationResponse(); + Assert.assertEquals(0, allocateResponse.getNumClusterNodes()); + Assert.assertEquals(2, interceptor.getTimedOutSCs(true).size()); + return null; + } + }); + } + @Test public void testSecondAttempt() throws Exception { final RegisterApplicationMasterRequest registerReq = @@ -803,6 +863,8 @@ public Object run() throws Exception { int numberOfContainers = 3; // Should re-attach secondaries and get the three running containers Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + // SC1 should be initialized to be timed out + Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size()); Assert.assertEquals(numberOfContainers, registerResponse.getContainersFromPreviousAttempts().size()); @@ -831,5 +893,4 @@ public Object run() throws Exception { } }); } - }