From 89da0e990124f3b72bacf1e5afb087cf09a789d4 Mon Sep 17 00:00:00 2001 From: Giovanni Matteo Fumarola Date: Tue, 21 Aug 2018 13:04:49 -0700 Subject: [PATCH] YARN-8581. [AMRMProxy] Add sub-cluster timeout in LocalityMulticastAMRMProxyPolicy. Contributed by Botong Huang. --- .../hadoop/yarn/conf/YarnConfiguration.java | 8 +- .../conf/TestYarnConfigurationFields.java | 2 + .../LocalityMulticastAMRMProxyPolicy.java | 64 ++++++++++++- .../utils/FederationStateStoreFacade.java | 9 ++ .../TestLocalityMulticastAMRMProxyPolicy.java | 91 +++++++++++++++++-- .../utils/FederationPoliciesTestUtil.java | 7 +- 6 files changed, 162 insertions(+), 19 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 3b39e55a6ca..0e268059491 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2835,8 +2835,14 @@ public class YarnConfiguration extends Configuration { "org.apache.hadoop.yarn.server.federation.resolver." + "DefaultSubClusterResolverImpl"; - public static final String DEFAULT_FEDERATION_POLICY_KEY = "*"; + // AMRMProxy split-merge timeout for active sub-clusters. We will not route + // new asks to expired sub-clusters. + public static final String FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT = + FEDERATION_PREFIX + "amrmproxy.subcluster.timeout.ms"; + public static final long DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT = + 60000; // one minute + public static final String DEFAULT_FEDERATION_POLICY_KEY = "*"; public static final String FEDERATION_POLICY_MANAGER = FEDERATION_PREFIX + "policy-manager"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index 230d8403bbb..8d6f72f549c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -101,6 +101,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase { .add(YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER); configurationPropsToSkipCompare .add(YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS); + configurationPropsToSkipCompare + .add(YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT); // Federation StateStore ZK implementation configs to be ignored configurationPropsToSkipCompare.add( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java index e318e3a6d93..f346a6e63a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; @@ -126,6 +127,8 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { private SubClusterResolver resolver; private Map headroom; + private Map lastHeartbeatTimeStamp; + private long subClusterTimeOut; private float hrAlpha; private FederationStateStoreFacade federationFacade; private AllocationBookkeeper bookkeeper; @@ -178,6 +181,7 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { if (headroom == null) { headroom = new ConcurrentHashMap<>(); + lastHeartbeatTimeStamp = new ConcurrentHashMap<>(); } hrAlpha = policy.getHeadroomAlpha(); @@ -185,13 +189,29 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { policyContext.getFederationStateStoreFacade(); this.homeSubcluster = policyContext.getHomeSubcluster(); + this.subClusterTimeOut = this.federationFacade.getConf().getLong( + YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT, + YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT); + if (this.subClusterTimeOut <= 0) { + LOG.info( + "{} configured to be {}, should be positive. Using default of {}.", + YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT, + this.subClusterTimeOut, + YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT); + this.subClusterTimeOut = + YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT; + } } @Override public void notifyOfResponse(SubClusterId subClusterId, AllocateResponse response) throws YarnException { - // stateless policy does not care about responses except tracking headroom - headroom.put(subClusterId, response.getAvailableResources()); + if (response.getAvailableResources() != null) { + headroom.put(subClusterId, response.getAvailableResources()); + LOG.info("Subcluster {} updated with {} memory headroom", subClusterId, + response.getAvailableResources().getMemorySize()); + } + lastHeartbeatTimeStamp.put(subClusterId, System.currentTimeMillis()); } @Override @@ -281,6 +301,15 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { // handle all non-localized requests (ANY) splitAnyRequests(nonLocalizedRequests, bookkeeper); + for (Map.Entry> entry : bookkeeper + .getAnswer().entrySet()) { + // A new-cluster here will trigger new UAM luanch, which might take a long + // time. We don't want too many requests stuck in this UAM before it is + // ready and starts heartbeating + if (!lastHeartbeatTimeStamp.containsKey(entry.getKey())) { + lastHeartbeatTimeStamp.put(entry.getKey(), System.currentTimeMillis()); + } + } return bookkeeper.getAnswer(); } @@ -519,13 +548,10 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { policyWeights = weights; totPolicyWeight = 0; - // pre-compute the set of subclusters that are both active and enabled by - // the policy weights, and accumulate their total weight for (Map.Entry entry : policyWeights.entrySet()) { if (entry.getValue() > 0 && activeSubclusters.containsKey(entry.getKey())) { activeAndEnabledSC.add(entry.getKey()); - totPolicyWeight += entry.getValue(); } } @@ -535,6 +561,34 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { + "currently active we cannot forward the ResourceRequest(s)"); } + Set tmpSCSet = new HashSet<>(activeAndEnabledSC); + for (Map.Entry entry : lastHeartbeatTimeStamp + .entrySet()) { + long duration = System.currentTimeMillis() - entry.getValue(); + if (duration > subClusterTimeOut) { + LOG.warn( + "Subcluster {} does not have a success heartbeat for {}s, " + + "skip routing asks there for this request", + entry.getKey(), (double) duration / 1000); + tmpSCSet.remove(entry.getKey()); + } + } + if (tmpSCSet.size() < 1) { + LOG.warn("All active and enabled subclusters have expired last " + + "heartbeat time. Ignore the expiry check for this request"); + } else { + activeAndEnabledSC = tmpSCSet; + } + + LOG.info("{} subcluster active, {} subclusters active and enabled", + activeSubclusters.size(), activeAndEnabledSC.size()); + + // pre-compute the set of subclusters that are both active and enabled by + // the policy weights, and accumulate their total weight + for (SubClusterId sc : activeAndEnabledSC) { + totPolicyWeight += policyWeights.get(sc); + } + // pre-compute headroom-based weights for active/enabled subclusters for (Map.Entry r : headroom.entrySet()) { if (activeAndEnabledSC.contains(r.getKey())) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java index 682eb1457d9..328a2d0e20b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java @@ -391,6 +391,15 @@ public final class FederationStateStoreFacade { return this.subclusterResolver; } + /** + * Get the configuration. + * + * @return configuration object + */ + public Configuration getConf() { + return this.conf; + } + /** * Helper method to create instances of Object using the class name defined in * the configuration object. The instances creates {@link RetryProxy} using diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java index f66bbb6adf0..cf9ac5347cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java @@ -32,11 +32,13 @@ import java.util.Map; import java.util.Random; import java.util.Set; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; @@ -106,6 +108,10 @@ public class TestLocalityMulticastAMRMProxyPolicy } private void initializePolicy() throws YarnException { + initializePolicy(new YarnConfiguration()); + } + + private void initializePolicy(Configuration conf) throws YarnException { setFederationPolicyContext(new FederationPolicyInitializationContext()); SubClusterResolver resolver = FederationPoliciesTestUtil.initResolver(); getFederationPolicyContext().setFederationSubclusterResolver(resolver); @@ -116,7 +122,7 @@ public class TestLocalityMulticastAMRMProxyPolicy getFederationPolicyContext().setHomeSubcluster(getHomeSubCluster()); FederationPoliciesTestUtil.initializePolicyContext( getFederationPolicyContext(), getPolicy(), getPolicyInfo(), - getActiveSubclusters()); + getActiveSubclusters(), conf); } @Test(expected = FederationPolicyInitializationException.class) @@ -145,7 +151,7 @@ public class TestLocalityMulticastAMRMProxyPolicy initializePolicy(); List resourceRequests = createSimpleRequest(); - prepPolicyWithHeadroom(); + prepPolicyWithHeadroom(true); Map> response = ((FederationAMRMProxyPolicy) getPolicy()) @@ -205,7 +211,7 @@ public class TestLocalityMulticastAMRMProxyPolicy int numRR = 1000; List resourceRequests = createLargeRandomList(numRR); - prepPolicyWithHeadroom(); + prepPolicyWithHeadroom(true); int numIterations = 1000; long tstart = System.currentTimeMillis(); @@ -233,7 +239,7 @@ public class TestLocalityMulticastAMRMProxyPolicy List resourceRequests = createZeroSizedANYRequest(); // this receives responses from sc0,sc1,sc2 - prepPolicyWithHeadroom(); + prepPolicyWithHeadroom(true); Map> response = ((FederationAMRMProxyPolicy) getPolicy()) @@ -269,7 +275,7 @@ public class TestLocalityMulticastAMRMProxyPolicy initializePolicy(); List resourceRequests = createSimpleRequest(); - prepPolicyWithHeadroom(); + prepPolicyWithHeadroom(true); Map> response = ((FederationAMRMProxyPolicy) getPolicy()) @@ -292,10 +298,14 @@ public class TestLocalityMulticastAMRMProxyPolicy checkTotalContainerAllocation(response, 100); } - private void prepPolicyWithHeadroom() throws YarnException { + private void prepPolicyWithHeadroom(boolean setSubCluster0) + throws YarnException { AllocateResponse ar = getAllocateResponseWithTargetHeadroom(40); - ((FederationAMRMProxyPolicy) getPolicy()) - .notifyOfResponse(SubClusterId.newInstance("subcluster0"), ar); + + if (setSubCluster0) { + ((FederationAMRMProxyPolicy) getPolicy()) + .notifyOfResponse(SubClusterId.newInstance("subcluster0"), ar); + } ar = getAllocateResponseWithTargetHeadroom(0); ((FederationAMRMProxyPolicy) getPolicy()) @@ -333,7 +343,7 @@ public class TestLocalityMulticastAMRMProxyPolicy FederationPoliciesTestUtil.initializePolicyContext( getFederationPolicyContext(), getPolicy(), getPolicyInfo(), - getActiveSubclusters()); + getActiveSubclusters(), new Configuration()); List resourceRequests = createComplexRequest(); @@ -669,7 +679,7 @@ public class TestLocalityMulticastAMRMProxyPolicy List resourceRequests = new ArrayList<>(); // Initialize the headroom map - prepPolicyWithHeadroom(); + prepPolicyWithHeadroom(true); // Cancel at ANY level only resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L, @@ -716,4 +726,65 @@ public class TestLocalityMulticastAMRMProxyPolicy checkExpectedAllocation(response, "subcluster5", 1, 25); checkTotalContainerAllocation(response, 100); } + + @Test + public void testSubClusterExpiry() throws Exception { + + // Tests how the headroom info are used to split based on the capacity + // each RM claims to give us. + // Configure policy to be 100% headroom based + getPolicyInfo().setHeadroomAlpha(1.0f); + + YarnConfiguration conf = new YarnConfiguration(); + // Set expiry to 500ms + conf.setLong(YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT, + 500); + + initializePolicy(conf); + List resourceRequests = createSimpleRequest(); + + // Update the response timestamp for the first time + prepPolicyWithHeadroom(true); + + Map> response = + ((FederationAMRMProxyPolicy) getPolicy()) + .splitResourceRequests(resourceRequests); + + // pretty print requests + prettyPrintRequests(response); + + validateSplit(response, resourceRequests); + + /* + * based on headroom, we expect 75 containers to got to subcluster0 (60) and + * subcluster2 (15) according to the advertised headroom (40 and 10), no + * containers for sublcuster1 as it advertise zero headroom, and 25 to + * subcluster5 which has unknown headroom, and so it gets 1/4th of the load + */ + checkExpectedAllocation(response, "subcluster0", 1, 60); + checkExpectedAllocation(response, "subcluster1", 1, -1); + checkExpectedAllocation(response, "subcluster2", 1, 15); + checkExpectedAllocation(response, "subcluster5", 1, 25); + checkTotalContainerAllocation(response, 100); + + Thread.sleep(800); + + // Update the response timestamp for the second time, skipping sc0 and sc5 + prepPolicyWithHeadroom(false); + + response = ((FederationAMRMProxyPolicy) getPolicy()) + .splitResourceRequests(resourceRequests); + + // pretty print requests + prettyPrintRequests(response); + + validateSplit(response, resourceRequests); + + checkExpectedAllocation(response, "subcluster0", 1, -1); + checkExpectedAllocation(response, "subcluster1", 1, -1); + checkExpectedAllocation(response, "subcluster2", 1, 100); + checkExpectedAllocation(response, "subcluster5", 1, -1); + checkTotalContainerAllocation(response, 100); + } + } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java index 4954197ecfe..4361367246b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java @@ -117,7 +117,7 @@ public final class FederationPoliciesTestUtil { public static void initializePolicyContext( FederationPolicyInitializationContext fpc, ConfigurableFederationPolicy policy, WeightedPolicyInfo policyInfo, - Map activeSubclusters) + Map activeSubclusters, Configuration conf) throws YarnException { ByteBuffer buf = policyInfo.toByteBuffer(); fpc.setSubClusterPolicyConfiguration(SubClusterPolicyConfiguration @@ -134,7 +134,7 @@ public final class FederationPoliciesTestUtil { when(fss.getSubClusters(any(GetSubClustersInfoRequest.class))) .thenReturn(response); - facade.reinitialize(fss, new Configuration()); + facade.reinitialize(fss, conf); fpc.setFederationStateStoreFacade(facade); policy.reinitialize(fpc); } @@ -146,7 +146,8 @@ public final class FederationPoliciesTestUtil { FederationPolicyInitializationContext context = new FederationPolicyInitializationContext(null, initResolver(), initFacade(), SubClusterId.newInstance("homesubcluster")); - initializePolicyContext(context, policy, policyInfo, activeSubclusters); + initializePolicyContext(context, policy, policyInfo, activeSubclusters, + new Configuration()); } /**