YARN-5540. Scheduler spends too much time looking at empty priorities. Contributed by Jason Lowe

(cherry picked from commit 7558dbbb48)
This commit is contained in:
Jason Lowe 2016-09-19 20:31:35 +00:00
parent 9942ca2bf0
commit 035f5f8f1d
2 changed files with 118 additions and 43 deletions

View File

@ -26,8 +26,8 @@
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -59,7 +59,6 @@
public class AppSchedulingInfo { public class AppSchedulingInfo {
private static final Log LOG = LogFactory.getLog(AppSchedulingInfo.class); private static final Log LOG = LogFactory.getLog(AppSchedulingInfo.class);
private static final int EPOCH_BIT_SHIFT = 40;
private final ApplicationId applicationId; private final ApplicationId applicationId;
private final ApplicationAttemptId applicationAttemptId; private final ApplicationAttemptId applicationAttemptId;
@ -79,7 +78,8 @@ public class AppSchedulingInfo {
private Set<String> requestedPartitions = new HashSet<>(); private Set<String> requestedPartitions = new HashSet<>();
final Set<SchedulerRequestKey> schedulerKeys = new TreeSet<>(); private final ConcurrentSkipListMap<SchedulerRequestKey, Integer>
schedulerKeys = new ConcurrentSkipListMap<>();
final Map<SchedulerRequestKey, Map<String, ResourceRequest>> final Map<SchedulerRequestKey, Map<String, ResourceRequest>>
resourceRequestMap = new ConcurrentHashMap<>(); resourceRequestMap = new ConcurrentHashMap<>();
final Map<NodeId, Map<SchedulerRequestKey, Map<ContainerId, final Map<NodeId, Map<SchedulerRequestKey, Map<ContainerId,
@ -236,6 +236,7 @@ private void insertIncreaseRequest(SchedContainerChangeRequest request) {
if (null == requestsOnNodeWithPriority) { if (null == requestsOnNodeWithPriority) {
requestsOnNodeWithPriority = new TreeMap<>(); requestsOnNodeWithPriority = new TreeMap<>();
requestsOnNode.put(schedulerKey, requestsOnNodeWithPriority); requestsOnNode.put(schedulerKey, requestsOnNodeWithPriority);
incrementSchedulerKeyReference(schedulerKey);
} }
requestsOnNodeWithPriority.put(containerId, request); requestsOnNodeWithPriority.put(containerId, request);
@ -250,9 +251,28 @@ private void insertIncreaseRequest(SchedContainerChangeRequest request) {
LOG.debug("Added increase request:" + request.getContainerId() LOG.debug("Added increase request:" + request.getContainerId()
+ " delta=" + delta); + " delta=" + delta);
} }
}
// update Scheduler Keys private void incrementSchedulerKeyReference(
schedulerKeys.add(schedulerKey); SchedulerRequestKey schedulerKey) {
Integer schedulerKeyCount = schedulerKeys.get(schedulerKey);
if (schedulerKeyCount == null) {
schedulerKeys.put(schedulerKey, 1);
} else {
schedulerKeys.put(schedulerKey, schedulerKeyCount + 1);
}
}
private void decrementSchedulerKeyReference(
SchedulerRequestKey schedulerKey) {
Integer schedulerKeyCount = schedulerKeys.get(schedulerKey);
if (schedulerKeyCount != null) {
if (schedulerKeyCount > 1) {
schedulerKeys.put(schedulerKey, schedulerKeyCount - 1);
} else {
schedulerKeys.remove(schedulerKey);
}
}
} }
public synchronized boolean removeIncreaseRequest(NodeId nodeId, public synchronized boolean removeIncreaseRequest(NodeId nodeId,
@ -275,6 +295,7 @@ public synchronized boolean removeIncreaseRequest(NodeId nodeId,
// remove hierarchies if it becomes empty // remove hierarchies if it becomes empty
if (requestsOnNodeWithPriority.isEmpty()) { if (requestsOnNodeWithPriority.isEmpty()) {
requestsOnNode.remove(schedulerKey); requestsOnNode.remove(schedulerKey);
decrementSchedulerKeyReference(schedulerKey);
} }
if (requestsOnNode.isEmpty()) { if (requestsOnNode.isEmpty()) {
containerIncreaseRequestMap.remove(nodeId); containerIncreaseRequestMap.remove(nodeId);
@ -341,7 +362,6 @@ public synchronized boolean updateResourceRequests(
if (asks == null) { if (asks == null) {
asks = new ConcurrentHashMap<>(); asks = new ConcurrentHashMap<>();
this.resourceRequestMap.put(schedulerKey, asks); this.resourceRequestMap.put(schedulerKey, asks);
this.schedulerKeys.add(schedulerKey);
} }
// Increment number of containers if recovering preempted resources // Increment number of containers if recovering preempted resources
@ -360,29 +380,34 @@ public synchronized boolean updateResourceRequests(
anyResourcesUpdated = true; anyResourcesUpdated = true;
// Activate application. Metrics activation is done here.
// TODO: Shouldn't we activate even if numContainers = 0?
if (request.getNumContainers() > 0) {
activeUsersManager.activateApplication(user, applicationId);
}
// Update pendingResources // Update pendingResources
updatePendingResources(lastRequest, request, queue.getMetrics()); updatePendingResources(lastRequest, request, schedulerKey,
queue.getMetrics());
} }
} }
return anyResourcesUpdated; return anyResourcesUpdated;
} }
private void updatePendingResources(ResourceRequest lastRequest, private void updatePendingResources(ResourceRequest lastRequest,
ResourceRequest request, QueueMetrics metrics) { ResourceRequest request, SchedulerRequestKey schedulerKey,
QueueMetrics metrics) {
int lastRequestContainers =
(lastRequest != null) ? lastRequest.getNumContainers() : 0;
if (request.getNumContainers() <= 0) { if (request.getNumContainers() <= 0) {
if (lastRequestContainers >= 0) {
decrementSchedulerKeyReference(schedulerKey);
}
LOG.info("checking for deactivate of application :" LOG.info("checking for deactivate of application :"
+ this.applicationId); + this.applicationId);
checkForDeactivation(); checkForDeactivation();
} else {
// Activate application. Metrics activation is done here.
if (lastRequestContainers <= 0) {
incrementSchedulerKeyReference(schedulerKey);
activeUsersManager.activateApplication(user, applicationId);
}
} }
int lastRequestContainers =
(lastRequest != null) ? lastRequest.getNumContainers() : 0;
Resource lastRequestCapability = Resource lastRequestCapability =
lastRequest != null ? lastRequest.getCapability() : Resources.none(); lastRequest != null ? lastRequest.getCapability() : Resources.none();
metrics.incrPendingResources(user, metrics.incrPendingResources(user,
@ -505,7 +530,7 @@ public boolean getAndResetBlacklistChanged() {
} }
public synchronized Collection<SchedulerRequestKey> getSchedulerKeys() { public synchronized Collection<SchedulerRequestKey> getSchedulerKeys() {
return schedulerKeys; return schedulerKeys.keySet();
} }
public synchronized Map<String, ResourceRequest> getResourceRequests( public synchronized Map<String, ResourceRequest> getResourceRequests(
@ -617,7 +642,7 @@ public synchronized List<ResourceRequest> allocate(NodeType type,
} else if (type == NodeType.RACK_LOCAL) { } else if (type == NodeType.RACK_LOCAL) {
allocateRackLocal(node, schedulerKey, request, resourceRequests); allocateRackLocal(node, schedulerKey, request, resourceRequests);
} else { } else {
allocateOffSwitch(request, resourceRequests); allocateOffSwitch(request, resourceRequests, schedulerKey);
} }
QueueMetrics metrics = queue.getMetrics(); QueueMetrics metrics = queue.getMetrics();
if (pending) { if (pending) {
@ -656,7 +681,7 @@ private synchronized void allocateNodeLocal(SchedulerNode node,
ResourceRequest offRackRequest = resourceRequestMap.get(schedulerKey).get( ResourceRequest offRackRequest = resourceRequestMap.get(schedulerKey).get(
ResourceRequest.ANY); ResourceRequest.ANY);
decrementOutstanding(offRackRequest); decrementOutstanding(offRackRequest, schedulerKey);
// Update cloned NodeLocal, RackLocal and OffRack requests for recovery // Update cloned NodeLocal, RackLocal and OffRack requests for recovery
resourceRequests.add(cloneResourceRequest(nodeLocalRequest)); resourceRequests.add(cloneResourceRequest(nodeLocalRequest));
@ -684,7 +709,7 @@ private synchronized void allocateRackLocal(SchedulerNode node,
ResourceRequest offRackRequest = resourceRequestMap.get(schedulerKey).get( ResourceRequest offRackRequest = resourceRequestMap.get(schedulerKey).get(
ResourceRequest.ANY); ResourceRequest.ANY);
decrementOutstanding(offRackRequest); decrementOutstanding(offRackRequest, schedulerKey);
// Update cloned RackLocal and OffRack requests for recovery // Update cloned RackLocal and OffRack requests for recovery
resourceRequests.add(cloneResourceRequest(rackLocalRequest)); resourceRequests.add(cloneResourceRequest(rackLocalRequest));
@ -696,15 +721,16 @@ private synchronized void allocateRackLocal(SchedulerNode node,
* application. * application.
*/ */
private synchronized void allocateOffSwitch( private synchronized void allocateOffSwitch(
ResourceRequest offSwitchRequest, List<ResourceRequest> resourceRequests) { ResourceRequest offSwitchRequest, List<ResourceRequest> resourceRequests,
SchedulerRequestKey schedulerKey) {
// Update future requirements // Update future requirements
decrementOutstanding(offSwitchRequest); decrementOutstanding(offSwitchRequest, schedulerKey);
// Update cloned OffRack requests for recovery // Update cloned OffRack requests for recovery
resourceRequests.add(cloneResourceRequest(offSwitchRequest)); resourceRequests.add(cloneResourceRequest(offSwitchRequest));
} }
private synchronized void decrementOutstanding( private synchronized void decrementOutstanding(
ResourceRequest offSwitchRequest) { ResourceRequest offSwitchRequest, SchedulerRequestKey schedulerKey) {
int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1; int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1;
// Do not remove ANY // Do not remove ANY
@ -713,6 +739,7 @@ private synchronized void decrementOutstanding(
// Do we have any outstanding requests? // Do we have any outstanding requests?
// If there is nothing, we need to deactivate this application // If there is nothing, we need to deactivate this application
if (numOffSwitchContainers == 0) { if (numOffSwitchContainers == 0) {
decrementSchedulerKeyReference(schedulerKey);
checkForDeactivation(); checkForDeactivation();
} }
@ -723,24 +750,7 @@ private synchronized void decrementOutstanding(
} }
private synchronized void checkForDeactivation() { private synchronized void checkForDeactivation() {
boolean deactivate = true; if (schedulerKeys.isEmpty()) {
for (SchedulerRequestKey schedulerKey : getSchedulerKeys()) {
ResourceRequest request =
getResourceRequest(schedulerKey, ResourceRequest.ANY);
if (request != null) {
if (request.getNumContainers() > 0) {
deactivate = false;
break;
}
}
}
// also we need to check increase request
if (!deactivate) {
deactivate = containerIncreaseRequestMap.isEmpty();
}
if (deactivate) {
activeUsersManager.deactivateApplication(user, applicationId); activeUsersManager.deactivateApplication(user, applicationId);
} }
} }

View File

@ -23,11 +23,14 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.TreeSet; import java.util.TreeSet;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
import org.junit.Assert; import org.junit.Assert;
@ -105,4 +108,66 @@ public void testSchedulerRequestKeyOrdering() {
Assert.assertEquals(2, sk.getPriority().getPriority()); Assert.assertEquals(2, sk.getPriority().getPriority());
Assert.assertEquals(6, sk.getAllocationRequestId()); Assert.assertEquals(6, sk.getAllocationRequestId());
} }
@Test
public void testSchedulerKeyAccounting() {
ApplicationId appIdImpl = ApplicationId.newInstance(0, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appIdImpl, 1);
Queue queue = mock(Queue.class);
doReturn(mock(QueueMetrics.class)).when(queue).getMetrics();
AppSchedulingInfo info = new AppSchedulingInfo(
appAttemptId, "test", queue, mock(ActiveUsersManager.class), 0,
new ResourceUsage());
Assert.assertEquals(0, info.getSchedulerKeys().size());
Priority pri1 = Priority.newInstance(1);
ResourceRequest req1 = ResourceRequest.newInstance(pri1,
ResourceRequest.ANY, Resource.newInstance(1024, 1), 1);
Priority pri2 = Priority.newInstance(2);
ResourceRequest req2 = ResourceRequest.newInstance(pri2,
ResourceRequest.ANY, Resource.newInstance(1024, 1), 2);
List<ResourceRequest> reqs = new ArrayList<>();
reqs.add(req1);
reqs.add(req2);
info.updateResourceRequests(reqs, false);
ArrayList<SchedulerRequestKey> keys =
new ArrayList<>(info.getSchedulerKeys());
Assert.assertEquals(2, keys.size());
Assert.assertEquals(SchedulerRequestKey.create(req1), keys.get(0));
Assert.assertEquals(SchedulerRequestKey.create(req2), keys.get(1));
// iterate to verify no ConcurrentModificationException
for (SchedulerRequestKey schedulerKey : info.getSchedulerKeys()) {
info.allocate(NodeType.OFF_SWITCH, null, schedulerKey, req1, null);
}
Assert.assertEquals(1, info.getSchedulerKeys().size());
Assert.assertEquals(SchedulerRequestKey.create(req2),
info.getSchedulerKeys().iterator().next());
req2 = ResourceRequest.newInstance(pri2,
ResourceRequest.ANY, Resource.newInstance(1024, 1), 1);
reqs.clear();
reqs.add(req2);
info.updateResourceRequests(reqs, false);
info.allocate(NodeType.OFF_SWITCH, null, SchedulerRequestKey.create(req2),
req2, null);
Assert.assertEquals(0, info.getSchedulerKeys().size());
req1 = ResourceRequest.newInstance(pri1,
ResourceRequest.ANY, Resource.newInstance(1024, 1), 5);
reqs.clear();
reqs.add(req1);
info.updateResourceRequests(reqs, false);
Assert.assertEquals(1, info.getSchedulerKeys().size());
Assert.assertEquals(SchedulerRequestKey.create(req1),
info.getSchedulerKeys().iterator().next());
req1 = ResourceRequest.newInstance(pri1,
ResourceRequest.ANY, Resource.newInstance(1024, 1), 0);
reqs.clear();
reqs.add(req1);
info.updateResourceRequests(reqs, false);
Assert.assertEquals(0, info.getSchedulerKeys().size());
}
} }