From 64baa9ec89fb5e9ed157c6a4a6cdff07f48a2b03 Mon Sep 17 00:00:00 2001 From: Giovanni Matteo Fumarola Date: Fri, 29 Jun 2018 11:47:30 -0700 Subject: [PATCH] YARN-8481. AMRMProxyPolicies should accept heartbeat response from new/unknown subclusters. Contributed by Botong Huang. --- .../policies/amrmproxy/BroadcastAMRMProxyPolicy.java | 11 ----------- .../policies/amrmproxy/RejectAMRMProxyPolicy.java | 4 ---- .../TestBroadcastAMRMProxyFederationPolicy.java | 11 +++-------- 3 files changed, 3 insertions(+), 23 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java index 679f4d5fa41..7fddb8ea952 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java @@ -19,10 +19,8 @@ package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -30,7 +28,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; -import org.apache.hadoop.yarn.server.federation.policies.exceptions.UnknownSubclusterException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; @@ -40,8 +37,6 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; */ public class BroadcastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { - private Set knownClusterIds = new HashSet<>(); - @Override public void reinitialize( FederationPolicyInitializationContext policyContext) @@ -65,7 +60,6 @@ public class BroadcastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { // simply broadcast the resource request to all sub-clusters for (SubClusterId subClusterId : activeSubclusters.keySet()) { answer.put(subClusterId, resourceRequests); - knownClusterIds.add(subClusterId); } return answer; @@ -74,11 +68,6 @@ public class BroadcastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { @Override public void notifyOfResponse(SubClusterId subClusterId, AllocateResponse response) throws YarnException { - if (!knownClusterIds.contains(subClusterId)) { - throw new UnknownSubclusterException( - "The response is received from a subcluster that is unknown to this " - + "policy."); - } // stateless policy does not care about responses } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/RejectAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/RejectAMRMProxyPolicy.java index 3783df645c5..450060671c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/RejectAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/RejectAMRMProxyPolicy.java @@ -18,10 +18,8 @@ package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -38,8 +36,6 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; */ public class RejectAMRMProxyPolicy extends AbstractAMRMProxyPolicy { - private Set knownClusterIds = new HashSet<>(); - @Override public void reinitialize(FederationPolicyInitializationContext policyContext) throws FederationPolicyInitializationException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java index a21f53dc924..df5da85a915 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java @@ -89,7 +89,7 @@ public class TestBroadcastAMRMProxyFederationPolicy } @Test - public void testNotifyOfResponse() throws Exception { + public void testNotifyOfResponseFromUnknownSubCluster() throws Exception { String[] hosts = new String[] {"host1", "host2" }; List resourceRequests = FederationPoliciesTestUtil .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false); @@ -97,13 +97,8 @@ public class TestBroadcastAMRMProxyFederationPolicy ((FederationAMRMProxyPolicy) getPolicy()) .splitResourceRequests(resourceRequests); - try { - ((FederationAMRMProxyPolicy) getPolicy()).notifyOfResponse( - SubClusterId.newInstance("sc3"), mock(AllocateResponse.class)); - Assert.fail(); - } catch (FederationPolicyException f) { - System.out.println("Expected: " + f.getMessage()); - } + ((FederationAMRMProxyPolicy) getPolicy()).notifyOfResponse( + SubClusterId.newInstance("sc3"), mock(AllocateResponse.class)); ((FederationAMRMProxyPolicy) getPolicy()).notifyOfResponse( SubClusterId.newInstance("sc1"), mock(AllocateResponse.class));