YARN-8481. AMRMProxyPolicies should accept heartbeat response from new/unknown subclusters. Contributed by Botong Huang.

This commit is contained in:
Giovanni Matteo Fumarola 2018-06-29 11:47:30 -07:00
parent 14c7dc3c1e
commit 64baa9ec89
3 changed files with 3 additions and 23 deletions

View File

@ -19,10 +19,8 @@
package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -30,7 +28,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.UnknownSubclusterException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@ -40,8 +37,6 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
*/ */
public class BroadcastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { public class BroadcastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
private Set<SubClusterId> knownClusterIds = new HashSet<>();
@Override @Override
public void reinitialize( public void reinitialize(
FederationPolicyInitializationContext policyContext) FederationPolicyInitializationContext policyContext)
@ -65,7 +60,6 @@ public class BroadcastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
// simply broadcast the resource request to all sub-clusters // simply broadcast the resource request to all sub-clusters
for (SubClusterId subClusterId : activeSubclusters.keySet()) { for (SubClusterId subClusterId : activeSubclusters.keySet()) {
answer.put(subClusterId, resourceRequests); answer.put(subClusterId, resourceRequests);
knownClusterIds.add(subClusterId);
} }
return answer; return answer;
@ -74,11 +68,6 @@ public class BroadcastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
@Override @Override
public void notifyOfResponse(SubClusterId subClusterId, public void notifyOfResponse(SubClusterId subClusterId,
AllocateResponse response) throws YarnException { AllocateResponse response) throws YarnException {
if (!knownClusterIds.contains(subClusterId)) {
throw new UnknownSubclusterException(
"The response is received from a subcluster that is unknown to this "
+ "policy.");
}
// stateless policy does not care about responses // stateless policy does not care about responses
} }

View File

@ -18,10 +18,8 @@
package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -38,8 +36,6 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
*/ */
public class RejectAMRMProxyPolicy extends AbstractAMRMProxyPolicy { public class RejectAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
private Set<SubClusterId> knownClusterIds = new HashSet<>();
@Override @Override
public void reinitialize(FederationPolicyInitializationContext policyContext) public void reinitialize(FederationPolicyInitializationContext policyContext)
throws FederationPolicyInitializationException { throws FederationPolicyInitializationException {

View File

@ -89,7 +89,7 @@ public class TestBroadcastAMRMProxyFederationPolicy
} }
@Test @Test
public void testNotifyOfResponse() throws Exception { public void testNotifyOfResponseFromUnknownSubCluster() throws Exception {
String[] hosts = new String[] {"host1", "host2" }; String[] hosts = new String[] {"host1", "host2" };
List<ResourceRequest> resourceRequests = FederationPoliciesTestUtil List<ResourceRequest> resourceRequests = FederationPoliciesTestUtil
.createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false); .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false);
@ -97,13 +97,8 @@ public class TestBroadcastAMRMProxyFederationPolicy
((FederationAMRMProxyPolicy) getPolicy()) ((FederationAMRMProxyPolicy) getPolicy())
.splitResourceRequests(resourceRequests); .splitResourceRequests(resourceRequests);
try { ((FederationAMRMProxyPolicy) getPolicy()).notifyOfResponse(
((FederationAMRMProxyPolicy) getPolicy()).notifyOfResponse( SubClusterId.newInstance("sc3"), mock(AllocateResponse.class));
SubClusterId.newInstance("sc3"), mock(AllocateResponse.class));
Assert.fail();
} catch (FederationPolicyException f) {
System.out.println("Expected: " + f.getMessage());
}
((FederationAMRMProxyPolicy) getPolicy()).notifyOfResponse( ((FederationAMRMProxyPolicy) getPolicy()).notifyOfResponse(
SubClusterId.newInstance("sc1"), mock(AllocateResponse.class)); SubClusterId.newInstance("sc1"), mock(AllocateResponse.class));