YARN-8984. AMRMClient#OutstandingSchedRequests leaks when AllocationTags is null or empty. Contributed by Yang Wang.

This commit is contained in:
Weiwei Yang 2018-11-22 16:52:29 +08:00
parent f207e30142
commit 176bb3f812
4 changed files with 203 additions and 59 deletions

View File

@ -1037,6 +1037,11 @@ RemoteRequestsTable<T> getTable(long allocationRequestId) {
return remoteRequests.get(Long.valueOf(allocationRequestId));
}
@VisibleForTesting
Map<Set<String>, List<SchedulingRequest>> getOutstandingSchedRequests() {
return outstandingSchedRequests;
}
RemoteRequestsTable<T> putTable(long allocationRequestId,
RemoteRequestsTable<T> table) {
return remoteRequests.put(Long.valueOf(allocationRequestId), table);

View File

@ -196,7 +196,9 @@ Collections.<String, LocalResource> emptyMap(),
@After
public void teardown() throws YarnException, IOException {
yarnClient.killApplication(attemptId.getApplicationId());
if (yarnClient != null) {
yarnClient.killApplication(attemptId.getApplicationId());
}
attemptId = null;
if (yarnClient != null &&

View File

@ -37,6 +37,7 @@
import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
@ -58,66 +59,46 @@
*/
public class TestAMRMClientPlacementConstraints extends BaseAMRMClientTest {
@Test(timeout=60000)
public void testAMRMClientWithPlacementConstraints()
throws Exception {
// we have to create a new instance of MiniYARNCluster to avoid SASL qop
// mismatches between client and server
teardown();
private List<Container> allocatedContainers = null;
private List<RejectedSchedulingRequest> rejectedSchedulingRequests = null;
private Map<Set<String>, PlacementConstraint> pcMapping = null;
@Before
public void setup() throws Exception {
conf = new YarnConfiguration();
conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER);
createClusterAndStartApplication(conf);
AMRMClient<AMRMClient.ContainerRequest> amClient =
AMRMClient.<AMRMClient.ContainerRequest>createAMRMClient();
amClient.setNMTokenCache(new NMTokenCache());
//asserting we are not using the singleton instance cache
Assert.assertNotSame(NMTokenCache.getSingleton(),
amClient.getNMTokenCache());
final List<Container> allocatedContainers = new ArrayList<>();
final List<RejectedSchedulingRequest> rejectedSchedulingRequests =
new ArrayList<>();
AMRMClientAsync asyncClient = new AMRMClientAsyncImpl<>(amClient, 1000,
new AMRMClientAsync.AbstractCallbackHandler() {
@Override
public void onContainersAllocated(List<Container> containers) {
allocatedContainers.addAll(containers);
}
@Override
public void onRequestsRejected(
List<RejectedSchedulingRequest> rejReqs) {
rejectedSchedulingRequests.addAll(rejReqs);
}
@Override
public void onContainersCompleted(List<ContainerStatus> statuses) {}
@Override
public void onContainersUpdated(List<UpdatedContainer> containers) {}
@Override
public void onShutdownRequest() {}
@Override
public void onNodesUpdated(List<NodeReport> updatedNodes) {}
@Override
public void onError(Throwable e) {}
@Override
public float getProgress() {
return 0.1f;
}
});
asyncClient.init(conf);
asyncClient.start();
Map<Set<String>, PlacementConstraint> pcMapping = new HashMap<>();
allocatedContainers = new ArrayList<>();
rejectedSchedulingRequests = new ArrayList<>();
pcMapping = new HashMap<>();
pcMapping.put(Collections.singleton("foo"),
PlacementConstraints.build(
PlacementConstraints.targetNotIn(NODE, allocationTag("foo"))));
pcMapping.put(Collections.singleton("bar"),
PlacementConstraints.build(
PlacementConstraints.targetNotIn(NODE, allocationTag("bar"))));
}
@Test(timeout=60000)
public void testAMRMClientWithPlacementConstraintsByPlacementProcessor()
throws Exception {
// we have to create a new instance of MiniYARNCluster to avoid SASL qop
// mismatches between client and server
conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER);
createClusterAndStartApplication(conf);
allocatedContainers.clear();
rejectedSchedulingRequests.clear();
AMRMClient<AMRMClient.ContainerRequest> amClient =
AMRMClient.<AMRMClient.ContainerRequest>createAMRMClient();
amClient.setNMTokenCache(new NMTokenCache());
//asserting we are not using the singleton instance cache
Assert.assertNotSame(NMTokenCache.getSingleton(),
amClient.getNMTokenCache());
AMRMClientAsync asyncClient = new AMRMClientAsyncImpl<>(amClient,
1000, new TestCallbackHandler());
asyncClient.init(conf);
asyncClient.start();
asyncClient.registerApplicationMaster("Host", 10000, "", pcMapping);
// Send two types of requests - 4 with source tag "foo" have numAlloc = 1
@ -144,6 +125,15 @@ public float getProgress() {
allocatedContainers.stream().collect(
Collectors.groupingBy(Container::getNodeId));
Map<Set<String>, List<SchedulingRequest>> outstandingSchedRequests =
((AMRMClientImpl)amClient).getOutstandingSchedRequests();
// Check the outstanding SchedulingRequests
Assert.assertEquals(2, outstandingSchedRequests.size());
Assert.assertEquals(1, outstandingSchedRequests.get(
new HashSet<>(Collections.singletonList("foo"))).size());
Assert.assertEquals(1, outstandingSchedRequests.get(
new HashSet<>(Collections.singletonList("bar"))).size());
// Ensure 2 containers allocated per node.
// Each node should have a "foo" and a "bar" container.
Assert.assertEquals(3, containersPerNode.entrySet().size());
@ -169,6 +159,140 @@ public float getProgress() {
asyncClient.stop();
}
@Test(timeout=60000)
public void testAMRMClientWithPlacementConstraintsByScheduler()
throws Exception {
// we have to create a new instance of MiniYARNCluster to avoid SASL qop
// mismatches between client and server
conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
createClusterAndStartApplication(conf);
allocatedContainers.clear();
rejectedSchedulingRequests.clear();
AMRMClient<AMRMClient.ContainerRequest> amClient =
AMRMClient.<AMRMClient.ContainerRequest>createAMRMClient();
amClient.setNMTokenCache(new NMTokenCache());
//asserting we are not using the singleton instance cache
Assert.assertNotSame(NMTokenCache.getSingleton(),
amClient.getNMTokenCache());
AMRMClientAsync asyncClient = new AMRMClientAsyncImpl<>(amClient,
1000, new TestCallbackHandler());
asyncClient.init(conf);
asyncClient.start();
asyncClient.registerApplicationMaster("Host", 10000, "", pcMapping);
// Send two types of requests - 4 with source tag "foo" have numAlloc = 1
// and 1 with source tag "bar" and has numAlloc = 4. Both should be
// handled similarly. i.e: Since there are only 3 nodes,
// 2 schedulingRequests - 1 with source tag "foo" on one with source
// tag "bar" should get rejected.
asyncClient.addSchedulingRequests(
Arrays.asList(
// 4 reqs with numAlloc = 1
schedulingRequest(1, 1, 1, 1, 512, "foo"),
schedulingRequest(1, 1, 2, 1, 512, "foo"),
schedulingRequest(1, 1, 3, 1, 512, "foo"),
schedulingRequest(1, 1, 4, 1, 512, "foo"),
// 1 req with numAlloc = 4
schedulingRequest(4, 1, 5, 1, 512, "bar"),
// 1 empty tag
schedulingRequest(1, 1, 6, 1, 512, new HashSet<>())));
// kick the scheduler
waitForContainerAllocation(allocatedContainers,
rejectedSchedulingRequests, 7, 0);
Assert.assertEquals(7, allocatedContainers.size());
Map<NodeId, List<Container>> containersPerNode =
allocatedContainers.stream().collect(
Collectors.groupingBy(Container::getNodeId));
Map<Set<String>, List<SchedulingRequest>> outstandingSchedRequests =
((AMRMClientImpl)amClient).getOutstandingSchedRequests();
// Check the outstanding SchedulingRequests
Assert.assertEquals(3, outstandingSchedRequests.size());
Assert.assertEquals(1, outstandingSchedRequests.get(
new HashSet<>(Collections.singletonList("foo"))).size());
Assert.assertEquals(1, outstandingSchedRequests.get(
new HashSet<>(Collections.singletonList("bar"))).size());
Assert.assertEquals(0, outstandingSchedRequests.get(
new HashSet<String>()).size());
// Each node should have a "foo" and a "bar" container.
Assert.assertEquals(3, containersPerNode.entrySet().size());
HashSet<String> srcTags = new HashSet<>(Arrays.asList("foo", "bar"));
containersPerNode.entrySet().forEach(
x ->
Assert.assertEquals(
srcTags,
x.getValue()
.stream()
.filter(y -> !y.getAllocationTags().isEmpty())
.map(y -> y.getAllocationTags().iterator().next())
.collect(Collectors.toSet()))
);
// The rejected requests were not set by scheduler
Assert.assertEquals(0, rejectedSchedulingRequests.size());
asyncClient.stop();
}
@Test
/*
* Three cases of empty HashSet key of outstandingSchedRequests
* 1. Not set any tags
* 2. Set a empty set, e.g ImmutableSet.of(), new HashSet<>()
* 3. Set tag as null
*/
public void testEmptyKeyOfOutstandingSchedRequests() {
AMRMClient<AMRMClient.ContainerRequest> amClient =
AMRMClient.<AMRMClient.ContainerRequest>createAMRMClient();
HashSet<String> schedRequest = null;
amClient.addSchedulingRequests(Arrays.asList(
schedulingRequest(1, 1, 1, 1, 512, ExecutionType.GUARANTEED),
schedulingRequest(1, 1, 2, 1, 512, new HashSet<>()),
schedulingRequest(1, 1, 3, 1, 512, schedRequest)));
Map<Set<String>, List<SchedulingRequest>> outstandingSchedRequests =
((AMRMClientImpl)amClient).getOutstandingSchedRequests();
Assert.assertEquals(1, outstandingSchedRequests.size());
Assert.assertEquals(3, outstandingSchedRequests
.get(new HashSet<String>()).size());
}
private class TestCallbackHandler extends
AMRMClientAsync.AbstractCallbackHandler {
@Override
public void onContainersAllocated(List<Container> containers) {
allocatedContainers.addAll(containers);
}
@Override
public void onRequestsRejected(
List<RejectedSchedulingRequest> rejReqs) {
rejectedSchedulingRequests.addAll(rejReqs);
}
@Override
public void onContainersCompleted(List<ContainerStatus> statuses) {}
@Override
public void onContainersUpdated(List<UpdatedContainer> containers) {}
@Override
public void onShutdownRequest() {}
@Override
public void onNodesUpdated(List<NodeReport> updatedNodes) {}
@Override
public void onError(Throwable e) {}
@Override
public float getProgress() {
return 0.1f;
}
}
private static void waitForContainerAllocation(
List<Container> allocatedContainers,
List<RejectedSchedulingRequest> rejectedRequests,
@ -186,16 +310,30 @@ private static void waitForContainerAllocation(
private static SchedulingRequest schedulingRequest(int numAllocations,
int priority, long allocReqId, int cores, int mem, String... tags) {
return schedulingRequest(numAllocations, priority, allocReqId, cores, mem,
ExecutionType.GUARANTEED, tags);
ExecutionType.GUARANTEED, new HashSet<>(Arrays.asList(tags)));
}
private static SchedulingRequest schedulingRequest(int numAllocations,
int priority, long allocReqId, int cores, int mem, Set<String> tags) {
return schedulingRequest(numAllocations,
priority, allocReqId, cores, mem, ExecutionType.GUARANTEED, tags);
}
private static SchedulingRequest schedulingRequest(int numAllocations,
int priority, long allocReqId, int cores, int mem,
ExecutionType execType, String... tags) {
ExecutionType execType, Set<String> tags) {
SchedulingRequest schedRequest = schedulingRequest(numAllocations,
priority, allocReqId, cores, mem, execType);
schedRequest.setAllocationTags(tags);
return schedRequest;
}
private static SchedulingRequest schedulingRequest(int numAllocations,
int priority, long allocReqId, int cores, int mem,
ExecutionType execType) {
return SchedulingRequest.newBuilder()
.priority(Priority.newInstance(priority))
.allocationRequestId(allocReqId)
.allocationTags(new HashSet<>(Arrays.asList(tags)))
.executionType(ExecutionTypeRequest.newInstance(execType, true))
.resourceSizing(
ResourceSizing.newInstance(numAllocations,

View File

@ -197,8 +197,7 @@ public static void removeFromOutstandingSchedulingRequests(
return;
}
for (Container container : containers) {
if (container.getAllocationTags() != null
&& !container.getAllocationTags().isEmpty()) {
if (container.getAllocationTags() != null) {
List<SchedulingRequest> schedReqs =
outstandingSchedRequests.get(container.getAllocationTags());
if (schedReqs != null && !schedReqs.isEmpty()) {