YARN-6370. Properly handle rack requests for non-active subclusters in LocalityMulticastAMRMProxyPolicy. (Contributed by Botong Huang via curino).

This commit is contained in:
Carlo Curino 2017-03-22 13:53:47 -07:00
parent 51aeb2ce0c
commit ce419881c3
2 changed files with 41 additions and 18 deletions

View File

@ -261,7 +261,11 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
// If home-subcluster is not active, ignore node/rack request // If home-subcluster is not active, ignore node/rack request
if (bookkeeper.isActiveAndEnabled(homeSubcluster)) { if (bookkeeper.isActiveAndEnabled(homeSubcluster)) {
if (targetIds != null && targetIds.size() > 0) {
bookkeeper.addRackRR(homeSubcluster, rr);
} else {
bookkeeper.addLocalizedNodeRR(homeSubcluster, rr); bookkeeper.addLocalizedNodeRR(homeSubcluster, rr);
}
} else { } else {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("The homeSubCluster (" + homeSubcluster + ") we are " LOG.debug("The homeSubCluster (" + homeSubcluster + ") we are "

View File

@ -339,19 +339,20 @@ public class TestLocalityMulticastAMRMProxyPolicy
validateSplit(response, resourceRequests); validateSplit(response, resourceRequests);
prettyPrintRequests(response); prettyPrintRequests(response);
// we expect 4 entry for home subcluster (3 for request-id 4, and a part // we expect 7 entries for home subcluster (2 for request-id 4, 3 for
// of the broadcast of request-id 2 // request-id 5, and a part of the broadcast of request-id 2
checkExpectedAllocation(response, getHomeSubCluster().getId(), 4, 23); checkExpectedAllocation(response, getHomeSubCluster().getId(), 7, 29);
// for subcluster0 we expect 3 entry from request-id 0, and 3 from // for subcluster0 we expect 10 entries, 3 from request-id 0, and 3 from
// request-id 3, as well as part of the request-id 2 broadast // request-id 3, 3 entries from request-id 5, as well as part of the
checkExpectedAllocation(response, "subcluster0", 7, 26); // request-id 2 broadast
checkExpectedAllocation(response, "subcluster0", 10, 32);
// we expect 5 entry for subcluster1 (4 from request-id 1, and part // we expect 5 entries for subcluster1 (4 from request-id 1, and part
// of the broadcast of request-id 2 // of the broadcast of request-id 2
checkExpectedAllocation(response, "subcluster1", 5, 26); checkExpectedAllocation(response, "subcluster1", 5, 26);
// sub-cluster 2 should contain 3 entry from request-id 1 and 1 from the // sub-cluster 2 should contain 3 entries from request-id 1 and 1 from the
// broadcast of request-id 2, and no request-id 0 // broadcast of request-id 2, and no request-id 0
checkExpectedAllocation(response, "subcluster2", 4, 23); checkExpectedAllocation(response, "subcluster2", 4, 23);
@ -364,28 +365,33 @@ public class TestLocalityMulticastAMRMProxyPolicy
// check that the allocations that show up are what expected // check that the allocations that show up are what expected
for (ResourceRequest rr : response.get(getHomeSubCluster())) { for (ResourceRequest rr : response.get(getHomeSubCluster())) {
Assert.assertTrue(rr.getAllocationRequestId() == 4L Assert.assertTrue(
|| rr.getAllocationRequestId() == 2L); rr.getAllocationRequestId() == 2L || rr.getAllocationRequestId() == 4L
} || rr.getAllocationRequestId() == 5L);
for (ResourceRequest rr : response.get(getHomeSubCluster())) {
Assert.assertTrue(rr.getAllocationRequestId() != 1L);
} }
List<ResourceRequest> rrs = List<ResourceRequest> rrs =
response.get(SubClusterId.newInstance("subcluster0")); response.get(SubClusterId.newInstance("subcluster0"));
for (ResourceRequest rr : rrs) { for (ResourceRequest rr : rrs) {
Assert.assertTrue(rr.getAllocationRequestId() != 1L); Assert.assertTrue(rr.getAllocationRequestId() != 1L);
Assert.assertTrue(rr.getAllocationRequestId() != 4L);
}
for (ResourceRequest rr : response
.get(SubClusterId.newInstance("subcluster1"))) {
Assert.assertTrue(rr.getAllocationRequestId() == 1L
|| rr.getAllocationRequestId() == 2L);
} }
for (ResourceRequest rr : response for (ResourceRequest rr : response
.get(SubClusterId.newInstance("subcluster2"))) { .get(SubClusterId.newInstance("subcluster2"))) {
Assert.assertTrue(rr.getAllocationRequestId() != 0L); Assert.assertTrue(rr.getAllocationRequestId() == 1L
|| rr.getAllocationRequestId() == 2L);
} }
for (ResourceRequest rr : response for (ResourceRequest rr : response
.get(SubClusterId.newInstance("subcluster5"))) { .get(SubClusterId.newInstance("subcluster5"))) {
Assert.assertTrue(rr.getAllocationRequestId() >= 2); Assert.assertTrue(rr.getAllocationRequestId() == 2);
Assert.assertTrue(rr.getRelaxLocality()); Assert.assertTrue(rr.getRelaxLocality());
} }
} }
@ -555,7 +561,7 @@ public class TestLocalityMulticastAMRMProxyPolicy
out.add(FederationPoliciesTestUtil.createResourceRequest(1L, out.add(FederationPoliciesTestUtil.createResourceRequest(1L,
"subcluster2-rack3", 1024, 1, 1, 1, null, false)); "subcluster2-rack3", 1024, 1, 1, 1, null, false));
out.add(FederationPoliciesTestUtil.createResourceRequest(1L, out.add(FederationPoliciesTestUtil.createResourceRequest(1L,
ResourceRequest.ANY, 1024, 1, 1, 2, null, false)); ResourceRequest.ANY, 1024, 1, 1, 3, null, false));
// create a non-local ANY request that can span anything // create a non-local ANY request that can span anything
out.add(FederationPoliciesTestUtil.createResourceRequest(2L, out.add(FederationPoliciesTestUtil.createResourceRequest(2L,
@ -578,6 +584,19 @@ public class TestLocalityMulticastAMRMProxyPolicy
out.add(FederationPoliciesTestUtil.createResourceRequest(4L, out.add(FederationPoliciesTestUtil.createResourceRequest(4L,
ResourceRequest.ANY, 1024, 1, 1, 1, null, false)); ResourceRequest.ANY, 1024, 1, 1, 1, null, false));
// create a request of two hosts, an unknown node and a known node, both in
// a known rack, and expect the unknown node to show up in homesubcluster
out.add(FederationPoliciesTestUtil.createResourceRequest(5L,
"subcluster0-rack0-host0", 1024, 1, 1, 2, null, false));
out.add(FederationPoliciesTestUtil.createResourceRequest(5L,
"subcluster0-rack0", 1024, 1, 1, 2, null, false));
out.add(FederationPoliciesTestUtil.createResourceRequest(5L, "node4", 1024,
1, 1, 2, null, false));
out.add(FederationPoliciesTestUtil.createResourceRequest(5L, "rack2", 1024,
1, 1, 2, null, false));
out.add(FederationPoliciesTestUtil.createResourceRequest(5L,
ResourceRequest.ANY, 1024, 1, 1, 4, null, false));
return out; return out;
} }
} }